| <!DOCTYPE html> |
| <html> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| |
| <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon"> |
| <link rel="icon" href="/favicon.ico" type="image/x-icon"> |
| |
| <title>Storm HDFS Integration</title> |
| |
| <!-- Bootstrap core CSS --> |
| <link href="/assets/css/bootstrap.min.css" rel="stylesheet"> |
| <!-- Bootstrap theme --> |
| <link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet"> |
| |
| <!-- Custom styles for this template --> |
| <link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css"> |
| <link href="/css/style.css" rel="stylesheet"> |
| <link href="/assets/css/owl.theme.css" rel="stylesheet"> |
| <link href="/assets/css/owl.carousel.css" rel="stylesheet"> |
| <script type="text/javascript" src="/assets/js/jquery.min.js"></script> |
| <script type="text/javascript" src="/assets/js/bootstrap.min.js"></script> |
| <script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script> |
| <script type="text/javascript" src="/assets/js/storm.js"></script> |
| <!-- Just for debugging purposes. Don't actually copy these 2 lines! --> |
| <!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]--> |
| |
| <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> |
| <!--[if lt IE 9]> |
| <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> |
| <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> |
| <![endif]--> |
| </head> |
| |
| |
| <body> |
| <header> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-5"> |
| <a href="/index.html"><img src="/images/logo.png" class="logo" /></a> |
| </div> |
| <div class="col-md-5"> |
| |
| <h1>Version: 1.1.2</h1> |
| |
| </div> |
| <div class="col-md-2"> |
| <a href="/downloads.html" class="btn-std btn-block btn-download">Download</a> |
| </div> |
| </div> |
| </div> |
| </header> |
| <!--Header End--> |
| <!--Navigation Begin--> |
| <div class="navbar" role="banner"> |
| <div class="container-fluid"> |
| <div class="navbar-header"> |
| <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse"> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| </div> |
| <nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation"> |
| <ul class="nav navbar-nav"> |
| <li><a href="/index.html" id="home">Home</a></li> |
| <li><a href="/getting-help.html" id="getting-help">Getting Help</a></li> |
| <li><a href="/about/integrates.html" id="project-info">Project Information</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| |
| |
| <li><a href="/releases/2.0.0-SNAPSHOT/index.html">2.0.0-SNAPSHOT</a></li> |
| |
| |
| |
| <li><a href="/releases/1.2.2/index.html">1.2.2</a></li> |
| |
| |
| |
| <li><a href="/releases/1.1.2/index.html">1.1.2</a></li> |
| |
| |
| |
| |
| |
| <li><a href="/releases/1.0.6/index.html">1.0.6</a></li> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| </ul> |
| </li> |
| <li><a href="/talksAndVideos.html">Talks and Slideshows</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li> |
| <li><a href="/contribute/People.html">People</a></li> |
| <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> |
| </ul> |
| </li> |
| <li><a href="/2018/06/04/storm122-released.html" id="news">News</a></li> |
| </ul> |
| </nav> |
| </div> |
| </div> |
| |
| |
| |
| <div class="container-fluid"> |
| <h1 class="page-title">Storm HDFS Integration</h1> |
| <div class="row"> |
| <div class="col-md-12"> |
| <!-- Documentation --> |
| |
| <p class="post-meta"></p> |
| |
| <div class="documentation-content"><p>Storm components for interacting with HDFS file systems</p> |
| |
| <h2 id="usage">Usage</h2> |
| |
| <p>The following example will write pipe("|")-delimited files to the HDFS path hdfs://localhost:54310/foo. After every |
| 1,000 tuples it will sync filesystem, making that data visible to other HDFS clients. It will rotate files when they |
| reach 5 megabytes in size.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// use "|" instead of "," for field delimiter</span> |
| <span class="n">RecordFormat</span> <span class="n">format</span> <span class="o">=</span> <span class="k">new</span> <span class="n">DelimitedRecordFormat</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">withFieldDelimiter</span><span class="o">(</span><span class="s">"|"</span><span class="o">);</span> |
| |
| <span class="c1">// sync the filesystem after every 1k tuples</span> |
| <span class="n">SyncPolicy</span> <span class="n">syncPolicy</span> <span class="o">=</span> <span class="k">new</span> <span class="n">CountSyncPolicy</span><span class="o">(</span><span class="mi">1000</span><span class="o">);</span> |
| |
| <span class="c1">// rotate files when they reach 5MB</span> |
| <span class="n">FileRotationPolicy</span> <span class="n">rotationPolicy</span> <span class="o">=</span> <span class="k">new</span> <span class="n">FileSizeRotationPolicy</span><span class="o">(</span><span class="mf">5.0f</span><span class="o">,</span> <span class="n">Units</span><span class="o">.</span><span class="na">MB</span><span class="o">);</span> |
| |
| <span class="n">FileNameFormat</span> <span class="n">fileNameFormat</span> <span class="o">=</span> <span class="k">new</span> <span class="n">DefaultFileNameFormat</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">withPath</span><span class="o">(</span><span class="s">"/foo/"</span><span class="o">);</span> |
| |
| <span class="n">HdfsBolt</span> <span class="n">bolt</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HdfsBolt</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">withFsUrl</span><span class="o">(</span><span class="s">"hdfs://localhost:54310"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withFileNameFormat</span><span class="o">(</span><span class="n">fileNameFormat</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withRecordFormat</span><span class="o">(</span><span class="n">format</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withRotationPolicy</span><span class="o">(</span><span class="n">rotationPolicy</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withSyncPolicy</span><span class="o">(</span><span class="n">syncPolicy</span><span class="o">);</span> |
| </code></pre></div> |
| <h3 id="packaging-a-topology">Packaging a Topology</h3> |
| |
| <p>When packaging your topology, it's important that you use the <a href="">maven-shade-plugin</a> as opposed to the |
| <a href="">maven-assembly-plugin</a>.</p> |
| |
| <p>The shade plugin provides facilities for merging JAR manifest entries, which the hadoop client leverages for URL scheme |
| resolution.</p> |
| |
| <p>If you experience errors such as the following:</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs |
| </code></pre></div> |
| <p>it's an indication that your topology jar file isn't packaged properly.</p> |
| |
| <p>If you are using maven to create your topology jar, you should use the following <code>maven-shade-plugin</code> configuration to |
| create your topology jar:</p> |
| <div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt"><plugin></span> |
| <span class="nt"><groupId></span>org.apache.maven.plugins<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>maven-shade-plugin<span class="nt"></artifactId></span> |
| <span class="nt"><version></span>1.4<span class="nt"></version></span> |
| <span class="nt"><configuration></span> |
| <span class="nt"><createDependencyReducedPom></span>true<span class="nt"></createDependencyReducedPom></span> |
| <span class="nt"></configuration></span> |
| <span class="nt"><executions></span> |
| <span class="nt"><execution></span> |
| <span class="nt"><phase></span>package<span class="nt"></phase></span> |
| <span class="nt"><goals></span> |
| <span class="nt"><goal></span>shade<span class="nt"></goal></span> |
| <span class="nt"></goals></span> |
| <span class="nt"><configuration></span> |
| <span class="nt"><transformers></span> |
| <span class="nt"><transformer</span> |
| <span class="na">implementation=</span><span class="s">"org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"</span><span class="nt">/></span> |
| <span class="nt"><transformer</span> |
| <span class="na">implementation=</span><span class="s">"org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"</span><span class="nt">></span> |
| <span class="nt"><mainClass></mainClass></span> |
| <span class="nt"></transformer></span> |
| <span class="nt"></transformers></span> |
| <span class="nt"></configuration></span> |
| <span class="nt"></execution></span> |
| <span class="nt"></executions></span> |
| <span class="nt"></plugin></span> |
| |
| </code></pre></div> |
| <h3 id="specifying-a-hadoop-version">Specifying a Hadoop Version</h3> |
| |
| <p>By default, storm-hdfs uses the following Hadoop dependencies:</p> |
| <div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt"><dependency></span> |
| <span class="nt"><groupId></span>org.apache.hadoop<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>hadoop-client<span class="nt"></artifactId></span> |
| <span class="nt"><version></span>2.2.0<span class="nt"></version></span> |
| <span class="nt"><exclusions></span> |
| <span class="nt"><exclusion></span> |
| <span class="nt"><groupId></span>org.slf4j<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>slf4j-log4j12<span class="nt"></artifactId></span> |
| <span class="nt"></exclusion></span> |
| <span class="nt"></exclusions></span> |
| <span class="nt"></dependency></span> |
| <span class="nt"><dependency></span> |
| <span class="nt"><groupId></span>org.apache.hadoop<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>hadoop-hdfs<span class="nt"></artifactId></span> |
| <span class="nt"><version></span>2.2.0<span class="nt"></version></span> |
| <span class="nt"><exclusions></span> |
| <span class="nt"><exclusion></span> |
| <span class="nt"><groupId></span>org.slf4j<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>slf4j-log4j12<span class="nt"></artifactId></span> |
| <span class="nt"></exclusion></span> |
| <span class="nt"></exclusions></span> |
| <span class="nt"></dependency></span> |
| </code></pre></div> |
| <p>If you are using a different version of Hadoop, you should exclude the Hadoop libraries from the storm-hdfs dependency |
| and add the dependencies for your preferred version in your pom.</p> |
| |
| <p>Hadoop client version incompatibilites can manifest as errors like:</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero) |
| </code></pre></div> |
| <h2 id="customization">Customization</h2> |
| |
| <h3 id="record-formats">Record Formats</h3> |
| |
| <p>Record format can be controlled by providing an implementation of the <code>org.apache.storm.hdfs.format.RecordFormat</code> |
| interface:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">RecordFormat</span> <span class="kd">extends</span> <span class="n">Serializable</span> <span class="o">{</span> |
| <span class="kt">byte</span><span class="o">[]</span> <span class="nf">format</span><span class="o">(</span><span class="n">Tuple</span> <span class="n">tuple</span><span class="o">);</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| <p>The provided <code>org.apache.storm.hdfs.format.DelimitedRecordFormat</code> is capable of producing formats such as CSV and |
| tab-delimited files.</p> |
| |
| <h3 id="file-naming">File Naming</h3> |
| |
| <p>File naming can be controlled by providing an implementation of the <code>org.apache.storm.hdfs.format.FileNameFormat</code> |
| interface:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">FileNameFormat</span> <span class="kd">extends</span> <span class="n">Serializable</span> <span class="o">{</span> |
| <span class="kt">void</span> <span class="nf">prepare</span><span class="o">(</span><span class="n">Map</span> <span class="n">conf</span><span class="o">,</span> <span class="n">TopologyContext</span> <span class="n">topologyContext</span><span class="o">);</span> |
| <span class="n">String</span> <span class="nf">getName</span><span class="o">(</span><span class="kt">long</span> <span class="n">rotation</span><span class="o">,</span> <span class="kt">long</span> <span class="n">timeStamp</span><span class="o">);</span> |
| <span class="n">String</span> <span class="nf">getPath</span><span class="o">();</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| <p>The provided <code>org.apache.storm.hdfs.format.DefaultFileNameFormat</code> will create file names with the following format:</p> |
| <div class="highlight"><pre><code class="language-" data-lang=""> {prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension} |
| </code></pre></div> |
| <p>For example:</p> |
| <div class="highlight"><pre><code class="language-" data-lang=""> MyBolt-5-7-1390579837830.txt |
| </code></pre></div> |
| <p>By default, prefix is empty and extenstion is ".txt".</p> |
| |
| <h3 id="sync-policies">Sync Policies</h3> |
| |
| <p>Sync policies allow you to control when buffered data is flushed to the underlying filesystem (thus making it available |
| to clients reading the data) by implementing the <code>org.apache.storm.hdfs.sync.SyncPolicy</code> interface:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">SyncPolicy</span> <span class="kd">extends</span> <span class="n">Serializable</span> <span class="o">{</span> |
| <span class="kt">boolean</span> <span class="nf">mark</span><span class="o">(</span><span class="n">Tuple</span> <span class="n">tuple</span><span class="o">,</span> <span class="kt">long</span> <span class="n">offset</span><span class="o">);</span> |
| <span class="kt">void</span> <span class="nf">reset</span><span class="o">();</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| <p>The <code>HdfsBolt</code> will call the <code>mark()</code> method for every tuple it processes. Returning <code>true</code> will trigger the <code>HdfsBolt</code> |
| to perform a sync/flush, after which it will call the <code>reset()</code> method.</p> |
| |
| <p>The <code>org.apache.storm.hdfs.sync.CountSyncPolicy</code> class simply triggers a sync after the specified number of tuples have |
| been processed.</p> |
| |
| <h3 id="file-rotation-policies">File Rotation Policies</h3> |
| |
| <p>Similar to sync policies, file rotation policies allow you to control when data files are rotated by providing a |
| <code>org.apache.storm.hdfs.rotation.FileRotation</code> interface:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">FileRotationPolicy</span> <span class="kd">extends</span> <span class="n">Serializable</span> <span class="o">{</span> |
| <span class="kt">boolean</span> <span class="nf">mark</span><span class="o">(</span><span class="n">Tuple</span> <span class="n">tuple</span><span class="o">,</span> <span class="kt">long</span> <span class="n">offset</span><span class="o">);</span> |
| <span class="kt">void</span> <span class="nf">reset</span><span class="o">();</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| <p>The <code>org.apache.storm.hdfs.rotation.FileSizeRotationPolicy</code> implementation allows you to trigger file rotation when |
| data files reach a specific file size:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">FileRotationPolicy</span> <span class="n">rotationPolicy</span> <span class="o">=</span> <span class="k">new</span> <span class="n">FileSizeRotationPolicy</span><span class="o">(</span><span class="mf">5.0f</span><span class="o">,</span> <span class="n">Units</span><span class="o">.</span><span class="na">MB</span><span class="o">);</span> |
| </code></pre></div> |
| <h3 id="file-rotation-actions">File Rotation Actions</h3> |
| |
| <p>Both the HDFS bolt and Trident State implementation allow you to register any number of <code>RotationAction</code>s. |
| What <code>RotationAction</code>s do is provide a hook to allow you to perform some action right after a file is rotated. For |
| example, moving a file to a different location or renaming it.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">RotationAction</span> <span class="kd">extends</span> <span class="n">Serializable</span> <span class="o">{</span> |
| <span class="kt">void</span> <span class="nf">execute</span><span class="o">(</span><span class="n">FileSystem</span> <span class="n">fileSystem</span><span class="o">,</span> <span class="n">Path</span> <span class="n">filePath</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">IOException</span><span class="o">;</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| <p>Storm-HDFS includes a simple action that will move a file after rotation:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MoveFileAction</span> <span class="kd">implements</span> <span class="n">RotationAction</span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">Logger</span> <span class="n">LOG</span> <span class="o">=</span> <span class="n">LoggerFactory</span><span class="o">.</span><span class="na">getLogger</span><span class="o">(</span><span class="n">MoveFileAction</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> |
| |
| <span class="kd">private</span> <span class="n">String</span> <span class="n">destination</span><span class="o">;</span> |
| |
| <span class="kd">public</span> <span class="n">MoveFileAction</span> <span class="nf">withDestination</span><span class="o">(</span><span class="n">String</span> <span class="n">destDir</span><span class="o">){</span> |
| <span class="n">destination</span> <span class="o">=</span> <span class="n">destDir</span><span class="o">;</span> |
| <span class="k">return</span> <span class="k">this</span><span class="o">;</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">execute</span><span class="o">(</span><span class="n">FileSystem</span> <span class="n">fileSystem</span><span class="o">,</span> <span class="n">Path</span> <span class="n">filePath</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">IOException</span> <span class="o">{</span> |
| <span class="n">Path</span> <span class="n">destPath</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Path</span><span class="o">(</span><span class="n">destination</span><span class="o">,</span> <span class="n">filePath</span><span class="o">.</span><span class="na">getName</span><span class="o">());</span> |
| <span class="n">LOG</span><span class="o">.</span><span class="na">info</span><span class="o">(</span><span class="s">"Moving file {} to {}"</span><span class="o">,</span> <span class="n">filePath</span><span class="o">,</span> <span class="n">destPath</span><span class="o">);</span> |
| <span class="kt">boolean</span> <span class="n">success</span> <span class="o">=</span> <span class="n">fileSystem</span><span class="o">.</span><span class="na">rename</span><span class="o">(</span><span class="n">filePath</span><span class="o">,</span> <span class="n">destPath</span><span class="o">);</span> |
| <span class="k">return</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| <p>If you are using Trident and sequence files you can do something like this:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="n">HdfsState</span><span class="o">.</span><span class="na">Options</span> <span class="n">seqOpts</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HdfsState</span><span class="o">.</span><span class="na">SequenceFileOptions</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">withFileNameFormat</span><span class="o">(</span><span class="n">fileNameFormat</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withSequenceFormat</span><span class="o">(</span><span class="k">new</span> <span class="n">DefaultSequenceFormat</span><span class="o">(</span><span class="s">"key"</span><span class="o">,</span> <span class="s">"data"</span><span class="o">))</span> |
| <span class="o">.</span><span class="na">withRotationPolicy</span><span class="o">(</span><span class="n">rotationPolicy</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withFsUrl</span><span class="o">(</span><span class="s">"hdfs://localhost:54310"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">addRotationAction</span><span class="o">(</span><span class="k">new</span> <span class="n">MoveFileAction</span><span class="o">().</span><span class="na">withDestination</span><span class="o">(</span><span class="s">"/dest2/"</span><span class="o">));</span> |
| </code></pre></div> |
| <h2 id="support-for-hdfs-sequence-files">Support for HDFS Sequence Files</h2> |
| |
| <p>The <code>org.apache.storm.hdfs.bolt.SequenceFileBolt</code> class allows you to write storm data to HDFS sequence files:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="c1">// sync the filesystem after every 1k tuples</span> |
| <span class="n">SyncPolicy</span> <span class="n">syncPolicy</span> <span class="o">=</span> <span class="k">new</span> <span class="n">CountSyncPolicy</span><span class="o">(</span><span class="mi">1000</span><span class="o">);</span> |
| |
| <span class="c1">// rotate files when they reach 5MB</span> |
| <span class="n">FileRotationPolicy</span> <span class="n">rotationPolicy</span> <span class="o">=</span> <span class="k">new</span> <span class="n">FileSizeRotationPolicy</span><span class="o">(</span><span class="mf">5.0f</span><span class="o">,</span> <span class="n">Units</span><span class="o">.</span><span class="na">MB</span><span class="o">);</span> |
| |
| <span class="n">FileNameFormat</span> <span class="n">fileNameFormat</span> <span class="o">=</span> <span class="k">new</span> <span class="n">DefaultFileNameFormat</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">withExtension</span><span class="o">(</span><span class="s">".seq"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withPath</span><span class="o">(</span><span class="s">"/data/"</span><span class="o">);</span> |
| |
| <span class="c1">// create sequence format instance.</span> |
| <span class="n">DefaultSequenceFormat</span> <span class="n">format</span> <span class="o">=</span> <span class="k">new</span> <span class="n">DefaultSequenceFormat</span><span class="o">(</span><span class="s">"timestamp"</span><span class="o">,</span> <span class="s">"sentence"</span><span class="o">);</span> |
| |
| <span class="n">SequenceFileBolt</span> <span class="n">bolt</span> <span class="o">=</span> <span class="k">new</span> <span class="n">SequenceFileBolt</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">withFsUrl</span><span class="o">(</span><span class="s">"hdfs://localhost:54310"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withFileNameFormat</span><span class="o">(</span><span class="n">fileNameFormat</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withSequenceFormat</span><span class="o">(</span><span class="n">format</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withRotationPolicy</span><span class="o">(</span><span class="n">rotationPolicy</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withSyncPolicy</span><span class="o">(</span><span class="n">syncPolicy</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withCompressionType</span><span class="o">(</span><span class="n">SequenceFile</span><span class="o">.</span><span class="na">CompressionType</span><span class="o">.</span><span class="na">RECORD</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withCompressionCodec</span><span class="o">(</span><span class="s">"deflate"</span><span class="o">);</span> |
| </code></pre></div> |
| <p>The <code>SequenceFileBolt</code> requires that you provide a <code>org.apache.storm.hdfs.bolt.format.SequenceFormat</code> that maps tuples to |
| key/value pairs:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">SequenceFormat</span> <span class="kd">extends</span> <span class="n">Serializable</span> <span class="o">{</span> |
| <span class="n">Class</span> <span class="nf">keyClass</span><span class="o">();</span> |
| <span class="n">Class</span> <span class="nf">valueClass</span><span class="o">();</span> |
| |
| <span class="n">Writable</span> <span class="nf">key</span><span class="o">(</span><span class="n">Tuple</span> <span class="n">tuple</span><span class="o">);</span> |
| <span class="n">Writable</span> <span class="nf">value</span><span class="o">(</span><span class="n">Tuple</span> <span class="n">tuple</span><span class="o">);</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| <h2 id="trident-api">Trident API</h2> |
| |
| <p>storm-hdfs also includes a Trident <code>state</code> implementation for writing data to HDFS, with an API that closely mirrors |
| that of the bolts.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="n">Fields</span> <span class="n">hdfsFields</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"field1"</span><span class="o">,</span> <span class="s">"field2"</span><span class="o">);</span> |
| |
| <span class="n">FileNameFormat</span> <span class="n">fileNameFormat</span> <span class="o">=</span> <span class="k">new</span> <span class="n">DefaultFileNameFormat</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">withPath</span><span class="o">(</span><span class="s">"/trident"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withPrefix</span><span class="o">(</span><span class="s">"trident"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withExtension</span><span class="o">(</span><span class="s">".txt"</span><span class="o">);</span> |
| |
| <span class="n">RecordFormat</span> <span class="n">recordFormat</span> <span class="o">=</span> <span class="k">new</span> <span class="n">DelimitedRecordFormat</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">withFields</span><span class="o">(</span><span class="n">hdfsFields</span><span class="o">);</span> |
| |
| <span class="n">FileRotationPolicy</span> <span class="n">rotationPolicy</span> <span class="o">=</span> <span class="k">new</span> <span class="n">FileSizeRotationPolicy</span><span class="o">(</span><span class="mf">5.0f</span><span class="o">,</span> <span class="n">FileSizeRotationPolicy</span><span class="o">.</span><span class="na">Units</span><span class="o">.</span><span class="na">MB</span><span class="o">);</span> |
| |
| <span class="n">HdfsState</span><span class="o">.</span><span class="na">Options</span> <span class="n">options</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HdfsState</span><span class="o">.</span><span class="na">HdfsFileOptions</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">withFileNameFormat</span><span class="o">(</span><span class="n">fileNameFormat</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withRecordFormat</span><span class="o">(</span><span class="n">recordFormat</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withRotationPolicy</span><span class="o">(</span><span class="n">rotationPolicy</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withFsUrl</span><span class="o">(</span><span class="s">"hdfs://localhost:54310"</span><span class="o">);</span> |
| |
| <span class="n">StateFactory</span> <span class="n">factory</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HdfsStateFactory</span><span class="o">().</span><span class="na">withOptions</span><span class="o">(</span><span class="n">options</span><span class="o">);</span> |
| |
| <span class="n">TridentState</span> <span class="n">state</span> <span class="o">=</span> <span class="n">stream</span> |
| <span class="o">.</span><span class="na">partitionPersist</span><span class="o">(</span><span class="n">factory</span><span class="o">,</span> <span class="n">hdfsFields</span><span class="o">,</span> <span class="k">new</span> <span class="n">HdfsUpdater</span><span class="o">(),</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">());</span> |
| </code></pre></div> |
| <p>To use the sequence file <code>State</code> implementation, use the <code>HdfsState.SequenceFileOptions</code>:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="n">HdfsState</span><span class="o">.</span><span class="na">Options</span> <span class="n">seqOpts</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HdfsState</span><span class="o">.</span><span class="na">SequenceFileOptions</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">withFileNameFormat</span><span class="o">(</span><span class="n">fileNameFormat</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withSequenceFormat</span><span class="o">(</span><span class="k">new</span> <span class="n">DefaultSequenceFormat</span><span class="o">(</span><span class="s">"key"</span><span class="o">,</span> <span class="s">"data"</span><span class="o">))</span> |
| <span class="o">.</span><span class="na">withRotationPolicy</span><span class="o">(</span><span class="n">rotationPolicy</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withFsUrl</span><span class="o">(</span><span class="s">"hdfs://localhost:54310"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">addRotationAction</span><span class="o">(</span><span class="k">new</span> <span class="n">MoveFileAction</span><span class="o">().</span><span class="na">toDestination</span><span class="o">(</span><span class="s">"/dest2/"</span><span class="o">));</span> |
| </code></pre></div> |
| <h2 id="working-with-secure-hdfs">Working with Secure HDFS</h2> |
| |
| <p>If your topology is going to interact with secure HDFS, your bolts/states needs to be authenticated by NameNode. We |
| currently have 2 options to support this:</p> |
| |
| <h3 id="using-hdfs-delegation-tokens">Using HDFS delegation tokens</h3> |
| |
| <p>Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user. |
| The nimbus need to start with following configurations:</p> |
| |
| <p>nimbus.autocredential.plugins.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] |
| nimbus.credential.renewers.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] |
| hdfs.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hdfs super user that can impersonate other users.) |
| hdfs.kerberos.principal: "<a href="mailto:superuser@EXAMPLE.com">superuser@EXAMPLE.com</a>" |
| nimbus.credential.renewers.freq.secs : 82800 (23 hours, hdfs tokens needs to be renewed every 24 hours so this value should be |
| less then 24 hours.) |
| topology.hdfs.uri:"hdfs://host:port" (This is an optional config, by default we will use value of "fs.defaultFS" property |
| specified in hadoop's core-site.xml)</p> |
| |
| <p>Your topology configuration should have: |
| topology.auto-credentials :["org.apache.storm.hdfs.common.security.AutoHDFS"] </p> |
| |
| <p>If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hadoop configuration |
| files(core-site.xml and hdfs-site.xml) and the storm-hdfs jar with all the dependencies is present in nimbus's classpath. |
| Nimbus will use the keytab and principal specified in the config to authenticate with Namenode. From then on for every |
| topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the |
| topology submitter user. If topology was started with topology.auto-credentials set to AutoHDFS, nimbus will push the |
| delegation tokens to all the workers for your topology and the hdfs bolt/state will authenticate with namenode using |
| these tokens.</p> |
| |
| <p>As nimbus is impersonating topology submitter user, you need to ensure the user specified in hdfs.kerberos.principal |
| has permissions to acquire tokens on behalf of other users. To achieve this you need to follow configuration directions |
| listed on this link |
| <a href="http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html">http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html</a></p> |
| |
| <p>You can read about setting up secure HDFS here: <a href="http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html">http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html</a>.</p> |
| |
| <h3 id="using-keytabs-on-all-worker-hosts">Using keytabs on all worker hosts</h3> |
| |
| <p>If you have distributed the keytab files for hdfs user on all potential worker hosts then you can use this method. You should specify a |
| hdfs config key using the method HdfsBolt/State.withconfigKey("somekey") and the value map of this key should have following 2 properties:</p> |
| |
| <p>hdfs.keytab.file: "/path/to/keytab/" |
| hdfs.kerberos.principal: "<a href="mailto:user@EXAMPLE.com">user@EXAMPLE.com</a>"</p> |
| |
| <p>On worker hosts the bolt/trident-state code will use the keytab file with principal provided in the config to authenticate with |
| Namenode. This method is little dangerous as you need to ensure all workers have the keytab file at the same location and you need |
| to remember this as you bring up new hosts in the cluster.</p> |
| </div> |
| |
| |
| </div> |
| </div> |
| </div> |
| <footer> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Meetups</h5> |
| <ul class="latest-news"> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li> |
| |
| <li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li> |
| |
| <li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li> |
| |
| <li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li> |
| |
| <!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> --> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>About Storm</h5> |
| <p>Storm integrates with any queueing system and any database system. Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Storm with database systems is easy.</p> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>First Look</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/Rationale.html">Rationale</a></li> |
| <li><a href="/releases/current/Tutorial.html">Tutorial</a></li> |
| <li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li> |
| <li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Storm project</a></li> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Documentation</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/index.html">Index</a></li> |
| <li><a href="/releases/current/javadocs/index.html">Javadoc</a></li> |
| <li><a href="/releases/current/FAQ.html">FAQ</a></li> |
| </ul> |
| </div> |
| </div> |
| </div> |
| <hr/> |
| <div class="row"> |
| <div class="col-md-12"> |
| <p align="center">Copyright © 2015 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved. |
| <br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation. |
| <br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p> |
| </div> |
| </div> |
| </div> |
| </footer> |
| <!--Footer End--> |
| <!-- Scroll to top --> |
| <span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span> |
| |
| </body> |
| |
| </html> |
| |