blob: 2e7735aa8e380ec8e40284ac012928001a57db18 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Tutorials - Write records to partitioned streams</title>
<meta name="description" content="Apache DistributedLog is an high performance replicated log.
">
<link rel="stylesheet" href="/docs/0.4.0-incubating/styles/site.css">
<link rel="stylesheet" href="/docs/0.4.0-incubating/css/theme.css">
<!-- JQuery -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.0/jquery.min.js"></script>
<script src="/docs/0.4.0-incubating/js/bootstrap.min.js"></script>
<link rel="canonical" href="http://bookkeeper.apache.org/distributedlog/docs/0.4.0-incubating/tutorials/messaging-1.html" data-proofer-ignore>
<link rel="alternate" type="application/rss+xml" title="Apache DistributedLog (incubating)" href="http://bookkeeper.apache.org/distributedlog/docs/0.4.0-incubating/feed.xml">
<!-- Font Awesome -->
<script src="//cdnjs.cloudflare.com/ajax/libs/anchor-js/3.2.0/anchor.min.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','https://www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-83870961-1', 'auto');
ga('send', 'pageview');
</script>
<!-- End Google Analytics -->
<link rel="shortcut icon" type="image/x-icon" href="/images/favicon.ico">
</head>
<body role="document">
<nav class="navbar navbar-default navbar-fixed-top">
<div class="container">
<div class="navbar-header">
<a href="/" class="navbar-brand" >
<img alt="Brand" style="height: 28px" src="/docs/0.4.0-incubating/images/distributedlog_logo_navbar.png">
</a>
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#navbar" aria-expanded="false" aria-controls="navbar">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
</div>
<div id="navbar" class="navbar-collapse collapse">
<ul class="nav navbar-nav">
<!-- Overview -->
<li><a href="/docs/0.4.0-incubating/">V0.4.0</a></li>
<!-- Concepts -->
<li><a href="/docs/0.4.0-incubating/basics/introduction">Concepts</a></li>
<!-- Quick Start -->
<li>
<a href="/docs/0.4.0-incubating/start" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Start<span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li>
<a href="/docs/0.4.0-incubating/start/building.html">
Build DistributedLog from Source
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/start/download.html">
Download Releases
</a>
</li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Quickstart</strong></li>
<li>
<a href="/docs/0.4.0-incubating/start/quickstart.html">
Setup & Run Example
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/basic-1.html">
API - Write Records (via core library)
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/basic-2.html">
API - Write Records (via write proxy)
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/basic-5.html">
API - Read Records
</a>
</li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Deployment</strong></li>
<li>
<a href="/docs/0.4.0-incubating/deployment/cluster.html">
Cluster Setup
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/deployment/global-cluster.html">
Global Cluster Setup
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/deployment/docker.html">
Docker
</a>
</li>
</ul>
</li>
<!-- API -->
<li>
<a href="/docs/0.4.0-incubating/start" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">API<span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="/docs/0.4.0-incubating/api/java">Java</a></li>
</ul>
</li>
<!-- User Guide -->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">User Guide<span class="caret"></span></a>
<ul class="dropdown-menu">
<li>
<a href="/docs/0.4.0-incubating/basics/introduction.html">
Introduction
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/considerations/main.html">
Considerations
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/architecture/main.html">
Architecture
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/api/main.html">
API
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/configuration/main.html">
Configuration
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/design/main.html">
Detail Design
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/globalreplicatedlog/main.html">
Global Replicated Log
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/implementation/main.html">
Implementation
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/references/main.html">
References
</a>
</li>
</ul>
</li>
<!-- Admin Guide -->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Admin Guide<span class="caret"></span></a>
<ul class="dropdown-menu">
<li><a href="/docs/0.4.0-incubating/deployment/cluster">Cluster Setup</a></li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/operations.html">
Operations
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/loadtest.html">
Load Test
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/performance.html">
Performance Tuning
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/hardware.html">
Hardware
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/monitoring.html">
Monitoring
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/zookeeper.html">
ZooKeeper
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/bookkeeper.html">
BookKeeper
</a>
</li>
</ul>
</li>
<!-- Tutorials -->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Tutorials<span class="caret"></span></a>
<ul class="dropdown-menu">
<li class="dropdown-header"><strong>Basic</strong></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-1">Write Records (via Core Library)</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-2">Write Records (via Write Proxy)</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-3">Write Records to multiple streams</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-4">Atomic Write Records</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-5">Tailing Read Records</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-6">Rewind Read Records</a></li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Messaging</strong></li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-1.html">
Write records to partitioned streams
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-2.html">
Write records to multiple streams (load balancer)
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-3.html">
At-least-once Processing
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-4.html">
Exact-Once Processing
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-5.html">
Implement a kafka-like pub/sub system
</a>
</li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Replicated State Machines</strong></li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/replicatedstatemachines.html">
Build replicated state machines
</a>
</li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Analytics</strong></li>
<li><a href="/docs/0.4.0-incubating/tutorials/analytics-mapreduce">Process log streams using MapReduce</a></li>
</ul>
</li>
</ul>
</div><!--/.nav-collapse -->
</div>
</nav>
<link rel="stylesheet" href="">
<div class="container" role="main">
<div class="row">
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<div class="col-md-8 col-md-offset-2">
<div class="contents topic" id="messaging-tutorial-write-records-to-multiple-streams-partitioned-by-key">
<p class="topic-title first">Messaging Tutorial - Write records to multiple streams partitioned by key</p>
<ul class="auto-toc simple">
<li><a class="reference internal" href="#how-to-write-records-to-multiple-streams-partitioning-by-key" id="id1">1&nbsp;&nbsp;&nbsp;How to write records to multiple streams partitioning by key</a><ul class="auto-toc">
<li><a class="reference internal" href="#design-a-partitioner-interface" id="id2">1.1&nbsp;&nbsp;&nbsp;Design a partitioner interface</a></li>
<li><a class="reference internal" href="#write-records-based-on-partition-key" id="id3">1.2&nbsp;&nbsp;&nbsp;Write records based on partition key</a></li>
<li><a class="reference internal" href="#run-the-tutorial" id="id4">1.3&nbsp;&nbsp;&nbsp;Run the tutorial</a><ul class="auto-toc">
<li><a class="reference internal" href="#start-the-local-bookkeeper-cluster" id="id5">1.3.1&nbsp;&nbsp;&nbsp;Start the local bookkeeper cluster</a></li>
<li><a class="reference internal" href="#start-the-write-proxy" id="id6">1.3.2&nbsp;&nbsp;&nbsp;Start the write proxy</a></li>
<li><a class="reference internal" href="#create-the-stream" id="id7">1.3.3&nbsp;&nbsp;&nbsp;Create the stream</a></li>
<li><a class="reference internal" href="#tail-the-streams" id="id8">1.3.4&nbsp;&nbsp;&nbsp;Tail the streams</a></li>
<li><a class="reference internal" href="#write-records" id="id9">1.3.5&nbsp;&nbsp;&nbsp;Write records</a></li>
<li><a class="reference internal" href="#check-the-results" id="id10">1.3.6&nbsp;&nbsp;&nbsp;Check the results</a></li>
</ul>
</li>
</ul>
</li>
</ul>
</div>
<div class="section" id="how-to-write-records-to-multiple-streams-partitioning-by-key">
<h2><a class="toc-backref" href="#id1">1&nbsp;&nbsp;&nbsp;How to write records to multiple streams partitioning by key</a></h2>
<p>This tutorial shows how to build a multiple-partitioned writer, which writes records to streams partitioned by key.</p>
<div class="section" id="design-a-partitioner-interface">
<h3><a class="toc-backref" href="#id2">1.1&nbsp;&nbsp;&nbsp;Design a partitioner interface</a></h3>
<p>In order to implement a multiple-partitioned writer, we need a <cite>Partitioner</cite> to partition the records into different
streams based on their keys. The partitioner takes a <cite>KEY</cite> and its total partitions to compute a partition id for the
given <cite>KEY</cite>.</p>
<pre class="literal-block">
public interface Partitioner&lt;KEY&gt; {
int partition(KEY key, int totalPartitions);
}
</pre>
</div>
<div class="section" id="write-records-based-on-partition-key">
<h3><a class="toc-backref" href="#id3">1.2&nbsp;&nbsp;&nbsp;Write records based on partition key</a></h3>
<p>Once we have the <cite>Partitioner</cite> interface, it is easy to implement partitioned writer logic:</p>
<ul class="simple">
<li>Partitioner takes a <cite>KEY</cite> and the total number of partitions, and compute the partition id for the key.</li>
<li>Use the partition id to choose the stream to write.</li>
<li>Use <cite>DistributedLogClient.write(stream, ...)</cite> to write the data to the chosen stream.</li>
</ul>
<pre class="literal-block">
String[] streams = ...;
int pid = partitioner.partition(key, streams.length);
ByteBuffer value = ...;
client.write(streams[pid], value);
</pre>
</div>
<div class="section" id="run-the-tutorial">
<h3><a class="toc-backref" href="#id4">1.3&nbsp;&nbsp;&nbsp;Run the tutorial</a></h3>
<p>Run the example in the following steps:</p>
<div class="section" id="start-the-local-bookkeeper-cluster">
<h4><a class="toc-backref" href="#id5">1.3.1&nbsp;&nbsp;&nbsp;Start the local bookkeeper cluster</a></h4>
<p>You can use follow command to start the distributedlog stack locally.
After the distributedlog cluster is started, you could access it using
distributedlog uri <em>distributedlog://127.0.0.1:7000/messaging/distributedlog</em>.</p>
<pre class="literal-block">
// dlog local ${zk-port}
./distributedlog-core/bin/dlog local 7000
</pre>
</div>
<div class="section" id="start-the-write-proxy">
<h4><a class="toc-backref" href="#id6">1.3.2&nbsp;&nbsp;&nbsp;Start the write proxy</a></h4>
<p>Start the write proxy, listening on port 8000.</p>
<pre class="literal-block">
// DistributedLogServerApp -p ${service-port} --shard-id ${shard-id} -sp ${stats-port} -u {distributedlog-uri} -mx -c ${conf-file}
./distributedlog-service/bin/dlog org.apache.distributedlog.service.DistributedLogServerApp -p 8000 --shard-id 1 -sp 8001 -u distributedlog://127.0.0.1:7000/messaging/distributedlog -mx -c ${distributedlog-repo}/distributedlog-service/conf/distributedlog_proxy.conf
</pre>
</div>
<div class="section" id="create-the-stream">
<h4><a class="toc-backref" href="#id7">1.3.3&nbsp;&nbsp;&nbsp;Create the stream</a></h4>
<p>Create the stream under the distributedlog uri.</p>
<pre class="literal-block">
// Create Stream `messaging-stream-{1,5}`
// dlog tool create -u ${distributedlog-uri} -r ${stream-prefix} -e ${stream-regex}
./distributedlog-core/bin/dlog tool create -u distributedlog://127.0.0.1:7000/messaging/distributedlog -r messaging-stream- -e 1-5
</pre>
</div>
<div class="section" id="tail-the-streams">
<h4><a class="toc-backref" href="#id8">1.3.4&nbsp;&nbsp;&nbsp;Tail the streams</a></h4>
<p>Tailing the streams using <cite>MultiReader</cite> to wait for new records.</p>
<pre class="literal-block">
// Tailing Stream `messaging-stream-{1,5}`
// runner run org.apache.distributedlog.basic.MultiReader ${distributedlog-uri} ${stream}[, ${stream}]
./distributedlog-tutorials/distributedlog-basic/bin/runner run org.apache.distributedlog.basic.MultiReader distributedlog://127.0.0.1:7000/messaging/distributedlog messaging-stream-1,messaging-stream-2,messaging-stream-3,messaging-stream-4,messaging-stream-5
</pre>
</div>
<div class="section" id="write-records">
<h4><a class="toc-backref" href="#id9">1.3.5&nbsp;&nbsp;&nbsp;Write records</a></h4>
<p>Run the example to write records to multiple stream in a console - the record should be in the form of <cite>KEY:VALUE</cite>.</p>
<pre class="literal-block">
// Write Records into Stream `messaging-stream-{1,5}`
// runner run org.apache.distributedlog.messaging.ConsoleProxyPartitionedMultiWriter ${distributedlog-uri} ${stream}[, ${stream}]
./distributedlog-tutorials/distributedlog-messaging/bin/runner run org.apache.distributedlog.messaging.ConsoleProxyPartitionedMultiWriter 'inet!127.0.0.1:8000' messaging-stream-1,messaging-stream-2,messaging-stream-3,messaging-stream-4,messaging-stream-5
</pre>
</div>
<div class="section" id="check-the-results">
<h4><a class="toc-backref" href="#id10">1.3.6&nbsp;&nbsp;&nbsp;Check the results</a></h4>
<p>Example output from <cite>ConsoleProxyPartitionedMultiWriter</cite> and <cite>MultiReader</cite>.</p>
<pre class="literal-block">
// Output of `ConsoleProxyPartitionedMultiWriter`
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=utf8
May 08, 2016 1:22:35 PM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply
INFO: Resolver[inet] = com.twitter.finagle.InetResolver(com.twitter.finagle.InetResolver&#64;6c4cbf96)
May 08, 2016 1:22:35 PM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply
INFO: Resolver[fixedinet] = com.twitter.finagle.FixedInetResolver(com.twitter.finagle.FixedInetResolver&#64;57052dc3)
May 08, 2016 1:22:35 PM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply
INFO: Resolver[neg] = com.twitter.finagle.NegResolver$(com.twitter.finagle.NegResolver$&#64;14ff89d7)
May 08, 2016 1:22:35 PM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply
INFO: Resolver[nil] = com.twitter.finagle.NilResolver$(com.twitter.finagle.NilResolver$&#64;14b28d06)
May 08, 2016 1:22:35 PM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply
INFO: Resolver[fail] = com.twitter.finagle.FailResolver$(com.twitter.finagle.FailResolver$&#64;56488f87)
May 08, 2016 1:22:35 PM com.twitter.finagle.Init$$anonfun$1 apply$mcV$sp
INFO: Finagle version media-platform-tools/release-20160330-1117-sgerstein-9-g2dcdd6c (rev=2dcdd6c866f9bd3599ed49568d651189735e8ad6) built at 20160330-160058
[dlog] &gt; 1:value-1
[dlog] &gt; 2:value-2
[dlog] &gt; 3:value-3
[dlog] &gt; 4:value-4
[dlog] &gt; 5:value-5
[dlog] &gt;
// Output of `MultiReader`
Opening log stream messaging-stream-1
Opening log stream messaging-stream-2
Opening log stream messaging-stream-3
Opening log stream messaging-stream-4
Opening log stream messaging-stream-5
Log stream messaging-stream-2 is empty.
Wait for records from messaging-stream-2 starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}
Open reader to read records from stream messaging-stream-2
Log stream messaging-stream-1 is empty.
Wait for records from messaging-stream-1 starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}
Open reader to read records from stream messaging-stream-1
Log stream messaging-stream-3 is empty.
Wait for records from messaging-stream-3 starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}
Open reader to read records from stream messaging-stream-3
Log stream messaging-stream-4 is empty.
Wait for records from messaging-stream-4 starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}
Open reader to read records from stream messaging-stream-4
Log stream messaging-stream-5 is empty.
Wait for records from messaging-stream-5 starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}
Open reader to read records from stream messaging-stream-5
Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} from stream messaging-stream-2
&quot;&quot;&quot;
value-1
&quot;&quot;&quot;
Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} from stream messaging-stream-3
&quot;&quot;&quot;
value-2
&quot;&quot;&quot;
Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} from stream messaging-stream-4
&quot;&quot;&quot;
value-3
&quot;&quot;&quot;
Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} from stream messaging-stream-5
&quot;&quot;&quot;
value-4
&quot;&quot;&quot;
Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0} from stream messaging-stream-1
&quot;&quot;&quot;
value-5
&quot;&quot;&quot;
</pre>
</div>
</div>
</div>
</div>
</div>
<hr>
<div class="row">
<div class="col-xs-12">
<footer>
<p class="text-center">&copy; Copyright 2016
<a href="http://www.apache.org">The Apache Software Foundation.</a> All Rights Reserved.
</p>
<p class="text-center">
<a href="/docs/0.4.0-incubating/feed.xml">RSS Feed</a>
</p>
</footer>
</div>
</div>
<!-- container div end -->
</div>
<script>
(function () {
'use strict';
anchors.options.placement = 'right';
anchors.add();
})();
</script>
</body>
</html>