blob: e91f2e0e539e5b3c25610436edb4ec894009659d [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Tutorials - Write records to multiple streams using a load balancer</title>
<meta name="description" content="Apache DistributedLog is an high performance replicated log.
">
<link rel="stylesheet" href="/docs/0.4.0-incubating/styles/site.css">
<link rel="stylesheet" href="/docs/0.4.0-incubating/css/theme.css">
<!-- JQuery -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.0/jquery.min.js"></script>
<script src="/docs/0.4.0-incubating/js/bootstrap.min.js"></script>
<link rel="canonical" href="http://bookkeeper.apache.org/distributedlog/docs/0.4.0-incubating/tutorials/messaging-2.html" data-proofer-ignore>
<link rel="alternate" type="application/rss+xml" title="Apache DistributedLog (incubating)" href="http://bookkeeper.apache.org/distributedlog/docs/0.4.0-incubating/feed.xml">
<!-- Font Awesome -->
<script src="//cdnjs.cloudflare.com/ajax/libs/anchor-js/3.2.0/anchor.min.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','https://www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-83870961-1', 'auto');
ga('send', 'pageview');
</script>
<!-- End Google Analytics -->
<link rel="shortcut icon" type="image/x-icon" href="/images/favicon.ico">
</head>
<body role="document">
<nav class="navbar navbar-default navbar-fixed-top">
<div class="container">
<div class="navbar-header">
<a href="/" class="navbar-brand" >
<img alt="Brand" style="height: 28px" src="/docs/0.4.0-incubating/images/distributedlog_logo_navbar.png">
</a>
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#navbar" aria-expanded="false" aria-controls="navbar">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
</div>
<div id="navbar" class="navbar-collapse collapse">
<ul class="nav navbar-nav">
<!-- Overview -->
<li><a href="/docs/0.4.0-incubating/">V0.4.0</a></li>
<!-- Concepts -->
<li><a href="/docs/0.4.0-incubating/basics/introduction">Concepts</a></li>
<!-- Quick Start -->
<li>
<a href="/docs/0.4.0-incubating/start" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Start<span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li>
<a href="/docs/0.4.0-incubating/start/building.html">
Build DistributedLog from Source
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/start/download.html">
Download Releases
</a>
</li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Quickstart</strong></li>
<li>
<a href="/docs/0.4.0-incubating/start/quickstart.html">
Setup & Run Example
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/basic-1.html">
API - Write Records (via core library)
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/basic-2.html">
API - Write Records (via write proxy)
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/basic-5.html">
API - Read Records
</a>
</li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Deployment</strong></li>
<li>
<a href="/docs/0.4.0-incubating/deployment/cluster.html">
Cluster Setup
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/deployment/global-cluster.html">
Global Cluster Setup
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/deployment/docker.html">
Docker
</a>
</li>
</ul>
</li>
<!-- API -->
<li>
<a href="/docs/0.4.0-incubating/start" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">API<span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="/docs/0.4.0-incubating/api/java">Java</a></li>
</ul>
</li>
<!-- User Guide -->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">User Guide<span class="caret"></span></a>
<ul class="dropdown-menu">
<li>
<a href="/docs/0.4.0-incubating/basics/introduction.html">
Introduction
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/considerations/main.html">
Considerations
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/architecture/main.html">
Architecture
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/api/main.html">
API
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/configuration/main.html">
Configuration
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/design/main.html">
Detail Design
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/globalreplicatedlog/main.html">
Global Replicated Log
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/implementation/main.html">
Implementation
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/references/main.html">
References
</a>
</li>
</ul>
</li>
<!-- Admin Guide -->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Admin Guide<span class="caret"></span></a>
<ul class="dropdown-menu">
<li><a href="/docs/0.4.0-incubating/deployment/cluster">Cluster Setup</a></li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/operations.html">
Operations
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/loadtest.html">
Load Test
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/performance.html">
Performance Tuning
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/hardware.html">
Hardware
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/monitoring.html">
Monitoring
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/zookeeper.html">
ZooKeeper
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/bookkeeper.html">
BookKeeper
</a>
</li>
</ul>
</li>
<!-- Tutorials -->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Tutorials<span class="caret"></span></a>
<ul class="dropdown-menu">
<li class="dropdown-header"><strong>Basic</strong></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-1">Write Records (via Core Library)</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-2">Write Records (via Write Proxy)</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-3">Write Records to multiple streams</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-4">Atomic Write Records</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-5">Tailing Read Records</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-6">Rewind Read Records</a></li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Messaging</strong></li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-1.html">
Write records to partitioned streams
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-2.html">
Write records to multiple streams (load balancer)
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-3.html">
At-least-once Processing
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-4.html">
Exact-Once Processing
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-5.html">
Implement a kafka-like pub/sub system
</a>
</li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Replicated State Machines</strong></li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/replicatedstatemachines.html">
Build replicated state machines
</a>
</li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Analytics</strong></li>
<li><a href="/docs/0.4.0-incubating/tutorials/analytics-mapreduce">Process log streams using MapReduce</a></li>
</ul>
</li>
</ul>
</div><!--/.nav-collapse -->
</div>
</nav>
<link rel="stylesheet" href="">
<div class="container" role="main">
<div class="row">
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<div class="col-md-8 col-md-offset-2">
<div class="contents topic" id="messaging-tutorial-write-records-to-multiple-streams-using-a-load-balancer">
<p class="topic-title first">Messaging Tutorial - Write records to multiple streams using a load balancer</p>
<ul class="auto-toc simple">
<li><a class="reference internal" href="#how-to-write-records-to-multiple-streams-using-a-load-balancer" id="id2">1&nbsp;&nbsp;&nbsp;How to write records to multiple streams using a load balancer</a><ul class="auto-toc">
<li><a class="reference internal" href="#make-writing-to-a-stream-as-a-finagle-service" id="id3">1.1&nbsp;&nbsp;&nbsp;Make writing to a stream as a finagle service</a></li>
<li><a class="reference internal" href="#create-a-load-balancer-from-multiple-streams" id="id4">1.2&nbsp;&nbsp;&nbsp;Create a load balancer from multiple streams</a><ul class="auto-toc">
<li><a class="reference internal" href="#create-a-servicefactory" id="id5">1.2.1&nbsp;&nbsp;&nbsp;Create a ServiceFactory</a></li>
<li><a class="reference internal" href="#create-the-load-balancer" id="id6">1.2.2&nbsp;&nbsp;&nbsp;Create the load balancer</a></li>
</ul>
</li>
<li><a class="reference internal" href="#write-records" id="id7">1.3&nbsp;&nbsp;&nbsp;Write records</a></li>
<li><a class="reference internal" href="#run-the-tutorial" id="id8">1.4&nbsp;&nbsp;&nbsp;Run the tutorial</a><ul class="auto-toc">
<li><a class="reference internal" href="#start-the-local-bookkeeper-cluster" id="id9">1.4.1&nbsp;&nbsp;&nbsp;Start the local bookkeeper cluster</a></li>
<li><a class="reference internal" href="#start-the-write-proxy" id="id10">1.4.2&nbsp;&nbsp;&nbsp;Start the write proxy</a></li>
<li><a class="reference internal" href="#create-the-stream" id="id11">1.4.3&nbsp;&nbsp;&nbsp;Create the stream</a></li>
<li><a class="reference internal" href="#tail-the-streams" id="id12">1.4.4&nbsp;&nbsp;&nbsp;Tail the streams</a></li>
<li><a class="reference internal" href="#id1" id="id13">1.4.5&nbsp;&nbsp;&nbsp;Write records</a></li>
<li><a class="reference internal" href="#check-the-results" id="id14">1.4.6&nbsp;&nbsp;&nbsp;Check the results</a></li>
</ul>
</li>
</ul>
</li>
</ul>
</div>
<div class="section" id="how-to-write-records-to-multiple-streams-using-a-load-balancer">
<h2><a class="toc-backref" href="#id2">1&nbsp;&nbsp;&nbsp;How to write records to multiple streams using a load balancer</a></h2>
<p>If applications does not care about ordering and they just want use multiple streams to transform messages, it is easier
to use a load balancer to balancing the writes among the multiple streams.</p>
<p>This tutorial shows how to build a multi-streams writer, which ues a finagle load balancer to balancing traffic among multiple streams.</p>
<div class="section" id="make-writing-to-a-stream-as-a-finagle-service">
<h3><a class="toc-backref" href="#id3">1.1&nbsp;&nbsp;&nbsp;Make writing to a stream as a finagle service</a></h3>
<p>In order to leverage the finagle load balancer to balancing traffic among multiple streams, we have to make writing
to a stream as a finagle service.</p>
<pre class="literal-block">
class StreamWriter&lt;VALUE&gt; extends Service&lt;VALUE, DLSN&gt; {
private final String stream;
private final DistributedLogClient client;
StreamWriter(String stream,
DistributedLogClient client) {
this.stream = stream;
this.client = client;
}
&#64;Override
public Future&lt;DLSN&gt; apply(VALUE request) {
return client.write(stream, ByteBuffer.wrap(request.toString().getBytes(UTF_8)));
}
}
</pre>
</div>
<div class="section" id="create-a-load-balancer-from-multiple-streams">
<h3><a class="toc-backref" href="#id4">1.2&nbsp;&nbsp;&nbsp;Create a load balancer from multiple streams</a></h3>
<div class="section" id="create-a-servicefactory">
<h4><a class="toc-backref" href="#id5">1.2.1&nbsp;&nbsp;&nbsp;Create a ServiceFactory</a></h4>
<p>Create a set of finagle <cite>ServiceFactory</cite> over multiple streams.</p>
<pre class="literal-block">
String[] streams;
Set&lt;ServiceFactory&lt;VALUE, DLSN&gt;&gt; serviceFactories = Sets.newHashSet();
for (String stream : streams) {
Service&lt;VALUE, DLSN&gt; service = new StreamWriter(stream, client);
serviceFactories.add(new SingletonFactory&lt;VALUE, DLSN&gt;(service));
}
</pre>
</div>
<div class="section" id="create-the-load-balancer">
<h4><a class="toc-backref" href="#id6">1.2.2&nbsp;&nbsp;&nbsp;Create the load balancer</a></h4>
<pre class="literal-block">
Service&lt;VALUE, DLSN&gt; writeSerivce =
Balancers.heap(new scala.util.Random(System.currentTimeMillis()))
.newBalancer(
Activity.value(scalaSet),
NullStatsReceiver.get(),
new NoBrokersAvailableException(&quot;No partitions available&quot;)
).toService();
</pre>
</div>
</div>
<div class="section" id="write-records">
<h3><a class="toc-backref" href="#id7">1.3&nbsp;&nbsp;&nbsp;Write records</a></h3>
<p>Once the balancer service is initialized, we can write records through the balancer service.</p>
<pre class="literal-block">
Future&lt;DLSN&gt; writeFuture = writeSerivce.write(...);
</pre>
</div>
<div class="section" id="run-the-tutorial">
<h3><a class="toc-backref" href="#id8">1.4&nbsp;&nbsp;&nbsp;Run the tutorial</a></h3>
<p>Run the example in the following steps:</p>
<div class="section" id="start-the-local-bookkeeper-cluster">
<h4><a class="toc-backref" href="#id9">1.4.1&nbsp;&nbsp;&nbsp;Start the local bookkeeper cluster</a></h4>
<p>You can use follow command to start the distributedlog stack locally.
After the distributedlog cluster is started, you could access it using
distributedlog uri <em>distributedlog://127.0.0.1:7000/messaging/distributedlog</em>.</p>
<pre class="literal-block">
// dlog local ${zk-port}
./distributedlog-core/bin/dlog local 7000
</pre>
</div>
<div class="section" id="start-the-write-proxy">
<h4><a class="toc-backref" href="#id10">1.4.2&nbsp;&nbsp;&nbsp;Start the write proxy</a></h4>
<p>Start the write proxy, listening on port 8000.</p>
<pre class="literal-block">
// DistributedLogServerApp -p ${service-port} --shard-id ${shard-id} -sp ${stats-port} -u {distributedlog-uri} -mx -c ${conf-file}
./distributedlog-service/bin/dlog org.apache.distributedlog.service.DistributedLogServerApp -p 8000 --shard-id 1 -sp 8001 -u distributedlog://127.0.0.1:7000/messaging/distributedlog -mx -c ${distributedlog-repo}/distributedlog-service/conf/distributedlog_proxy.conf
</pre>
</div>
<div class="section" id="create-the-stream">
<h4><a class="toc-backref" href="#id11">1.4.3&nbsp;&nbsp;&nbsp;Create the stream</a></h4>
<p>Create the stream under the distributedlog uri.</p>
<pre class="literal-block">
// Create Stream `messaging-stream-{1,5}`
// dlog tool create -u ${distributedlog-uri} -r ${stream-prefix} -e ${stream-regex}
./distributedlog-core/bin/dlog tool create -u distributedlog://127.0.0.1:7000/messaging/distributedlog -r messaging-stream- -e 1-5
</pre>
</div>
<div class="section" id="tail-the-streams">
<h4><a class="toc-backref" href="#id12">1.4.4&nbsp;&nbsp;&nbsp;Tail the streams</a></h4>
<p>Tailing the streams using <cite>MultiReader</cite> to wait for new records.</p>
<pre class="literal-block">
// Tailing Stream `messaging-stream-{1,5}`
// runner run org.apache.distributedlog.basic.MultiReader ${distributedlog-uri} ${stream}[, ${stream}]
./distributedlog-tutorials/distributedlog-basic/bin/runner run org.apache.distributedlog.basic.MultiReader distributedlog://127.0.0.1:7000/messaging/distributedlog messaging-stream-1,messaging-stream-2,messaging-stream-3,messaging-stream-4,messaging-stream-5
</pre>
</div>
<div class="section" id="id1">
<h4><a class="toc-backref" href="#id13">1.4.5&nbsp;&nbsp;&nbsp;Write records</a></h4>
<p>Run the example to write records to multiple stream in a console.</p>
<pre class="literal-block">
// Write Records into Stream `messaging-stream-{1,5}`
// runner run org.apache.distributedlog.messaging.ConsoleProxyRRMultiWriter ${distributedlog-uri} ${stream}[, ${stream}]
./distributedlog-tutorials/distributedlog-messaging/bin/runner run org.apache.distributedlog.messaging.ConsoleProxyRRMultiWriter 'inet!127.0.0.1:8000' messaging-stream-1,messaging-stream-2,messaging-stream-3,messaging-stream-4,messaging-stream-5
</pre>
</div>
<div class="section" id="check-the-results">
<h4><a class="toc-backref" href="#id14">1.4.6&nbsp;&nbsp;&nbsp;Check the results</a></h4>
<p>Example output from <cite>ConsoleProxyRRMultiWriter</cite> and <cite>MultiReader</cite>.</p>
<pre class="literal-block">
// Output of `ConsoleProxyRRMultiWriter`
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=utf8
May 08, 2016 1:22:35 PM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply
INFO: Resolver[inet] = com.twitter.finagle.InetResolver(com.twitter.finagle.InetResolver&#64;6c4cbf96)
May 08, 2016 1:22:35 PM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply
INFO: Resolver[fixedinet] = com.twitter.finagle.FixedInetResolver(com.twitter.finagle.FixedInetResolver&#64;57052dc3)
May 08, 2016 1:22:35 PM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply
INFO: Resolver[neg] = com.twitter.finagle.NegResolver$(com.twitter.finagle.NegResolver$&#64;14ff89d7)
May 08, 2016 1:22:35 PM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply
INFO: Resolver[nil] = com.twitter.finagle.NilResolver$(com.twitter.finagle.NilResolver$&#64;14b28d06)
May 08, 2016 1:22:35 PM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply
INFO: Resolver[fail] = com.twitter.finagle.FailResolver$(com.twitter.finagle.FailResolver$&#64;56488f87)
May 08, 2016 1:22:35 PM com.twitter.finagle.Init$$anonfun$1 apply$mcV$sp
INFO: Finagle version media-platform-tools/release-20160330-1117-sgerstein-9-g2dcdd6c (rev=2dcdd6c866f9bd3599ed49568d651189735e8ad6) built at 20160330-160058
[dlog] &gt; message-1
[dlog] &gt; message-2
[dlog] &gt; message-3
[dlog] &gt; message-4
[dlog] &gt; message-5
[dlog] &gt;
// Output of `MultiReader`
Opening log stream messaging-stream-1
Opening log stream messaging-stream-2
Opening log stream messaging-stream-3
Opening log stream messaging-stream-4
Opening log stream messaging-stream-5
Log stream messaging-stream-2 is empty.
Wait for records from messaging-stream-2 starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}
Open reader to read records from stream messaging-stream-2
Log stream messaging-stream-1 is empty.
Wait for records from messaging-stream-1 starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}
Open reader to read records from stream messaging-stream-1
Log stream messaging-stream-3 is empty.
Wait for records from messaging-stream-3 starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}
Open reader to read records from stream messaging-stream-3
Log stream messaging-stream-4 is empty.
Wait for records from messaging-stream-4 starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}
Open reader to read records from stream messaging-stream-4
Log stream messaging-stream-5 is empty.
Wait for records from messaging-stream-5 starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}
Open reader to read records from stream messaging-stream-5
Received record DLSN{logSegmentSequenceNo=1, entryId=2, slotId=0} from stream messaging-stream-3
&quot;&quot;&quot;
message-1
&quot;&quot;&quot;
Received record DLSN{logSegmentSequenceNo=1, entryId=2, slotId=0} from stream messaging-stream-2
&quot;&quot;&quot;
message-2
&quot;&quot;&quot;
Received record DLSN{logSegmentSequenceNo=1, entryId=4, slotId=0} from stream messaging-stream-2
&quot;&quot;&quot;
message-3
&quot;&quot;&quot;
Received record DLSN{logSegmentSequenceNo=1, entryId=2, slotId=0} from stream messaging-stream-4
&quot;&quot;&quot;
message-4
&quot;&quot;&quot;
Received record DLSN{logSegmentSequenceNo=1, entryId=6, slotId=0} from stream messaging-stream-2
&quot;&quot;&quot;
message-5
&quot;&quot;&quot;
</pre>
</div>
</div>
</div>
</div>
</div>
<hr>
<div class="row">
<div class="col-xs-12">
<footer>
<p class="text-center">&copy; Copyright 2016
<a href="http://www.apache.org">The Apache Software Foundation.</a> All Rights Reserved.
</p>
<p class="text-center">
<a href="/docs/0.4.0-incubating/feed.xml">RSS Feed</a>
</p>
</footer>
</div>
</div>
<!-- container div end -->
</div>
<script>
(function () {
'use strict';
anchors.options.placement = 'right';
anchors.add();
})();
</script>
</body>
</html>