blob: 7d623348869c3f6e2e0bd16edb46357a9d3b79e4 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Tutorials - At-least-once Processing</title>
<meta name="description" content="Apache DistributedLog is an high performance replicated log.
">
<link rel="stylesheet" href="/docs/0.4.0-incubating/styles/site.css">
<link rel="stylesheet" href="/docs/0.4.0-incubating/css/theme.css">
<!-- JQuery -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.0/jquery.min.js"></script>
<script src="/docs/0.4.0-incubating/js/bootstrap.min.js"></script>
<link rel="canonical" href="http://bookkeeper.apache.org/distributedlog/docs/0.4.0-incubating/tutorials/messaging-3.html" data-proofer-ignore>
<link rel="alternate" type="application/rss+xml" title="Apache DistributedLog (incubating)" href="http://bookkeeper.apache.org/distributedlog/docs/0.4.0-incubating/feed.xml">
<!-- Font Awesome -->
<script src="//cdnjs.cloudflare.com/ajax/libs/anchor-js/3.2.0/anchor.min.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','https://www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-83870961-1', 'auto');
ga('send', 'pageview');
</script>
<!-- End Google Analytics -->
<link rel="shortcut icon" type="image/x-icon" href="/images/favicon.ico">
</head>
<body role="document">
<nav class="navbar navbar-default navbar-fixed-top">
<div class="container">
<div class="navbar-header">
<a href="/" class="navbar-brand" >
<img alt="Brand" style="height: 28px" src="/docs/0.4.0-incubating/images/distributedlog_logo_navbar.png">
</a>
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#navbar" aria-expanded="false" aria-controls="navbar">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
</div>
<div id="navbar" class="navbar-collapse collapse">
<ul class="nav navbar-nav">
<!-- Overview -->
<li><a href="/docs/0.4.0-incubating/">V0.4.0</a></li>
<!-- Concepts -->
<li><a href="/docs/0.4.0-incubating/basics/introduction">Concepts</a></li>
<!-- Quick Start -->
<li>
<a href="/docs/0.4.0-incubating/start" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Start<span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li>
<a href="/docs/0.4.0-incubating/start/building.html">
Build DistributedLog from Source
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/start/download.html">
Download Releases
</a>
</li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Quickstart</strong></li>
<li>
<a href="/docs/0.4.0-incubating/start/quickstart.html">
Setup & Run Example
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/basic-1.html">
API - Write Records (via core library)
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/basic-2.html">
API - Write Records (via write proxy)
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/basic-5.html">
API - Read Records
</a>
</li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Deployment</strong></li>
<li>
<a href="/docs/0.4.0-incubating/deployment/cluster.html">
Cluster Setup
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/deployment/global-cluster.html">
Global Cluster Setup
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/deployment/docker.html">
Docker
</a>
</li>
</ul>
</li>
<!-- API -->
<li>
<a href="/docs/0.4.0-incubating/start" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">API<span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="/docs/0.4.0-incubating/api/java">Java</a></li>
</ul>
</li>
<!-- User Guide -->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">User Guide<span class="caret"></span></a>
<ul class="dropdown-menu">
<li>
<a href="/docs/0.4.0-incubating/basics/introduction.html">
Introduction
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/considerations/main.html">
Considerations
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/architecture/main.html">
Architecture
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/api/main.html">
API
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/configuration/main.html">
Configuration
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/design/main.html">
Detail Design
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/globalreplicatedlog/main.html">
Global Replicated Log
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/implementation/main.html">
Implementation
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/references/main.html">
References
</a>
</li>
</ul>
</li>
<!-- Admin Guide -->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Admin Guide<span class="caret"></span></a>
<ul class="dropdown-menu">
<li><a href="/docs/0.4.0-incubating/deployment/cluster">Cluster Setup</a></li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/operations.html">
Operations
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/loadtest.html">
Load Test
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/performance.html">
Performance Tuning
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/hardware.html">
Hardware
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/monitoring.html">
Monitoring
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/zookeeper.html">
ZooKeeper
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/bookkeeper.html">
BookKeeper
</a>
</li>
</ul>
</li>
<!-- Tutorials -->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Tutorials<span class="caret"></span></a>
<ul class="dropdown-menu">
<li class="dropdown-header"><strong>Basic</strong></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-1">Write Records (via Core Library)</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-2">Write Records (via Write Proxy)</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-3">Write Records to multiple streams</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-4">Atomic Write Records</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-5">Tailing Read Records</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-6">Rewind Read Records</a></li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Messaging</strong></li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-1.html">
Write records to partitioned streams
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-2.html">
Write records to multiple streams (load balancer)
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-3.html">
At-least-once Processing
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-4.html">
Exact-Once Processing
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-5.html">
Implement a kafka-like pub/sub system
</a>
</li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Replicated State Machines</strong></li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/replicatedstatemachines.html">
Build replicated state machines
</a>
</li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Analytics</strong></li>
<li><a href="/docs/0.4.0-incubating/tutorials/analytics-mapreduce">Process log streams using MapReduce</a></li>
</ul>
</li>
</ul>
</div><!--/.nav-collapse -->
</div>
</nav>
<link rel="stylesheet" href="">
<div class="container" role="main">
<div class="row">
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<div class="col-md-8 col-md-offset-2">
<div class="contents topic" id="messaging-tutorial-at-least-once-processing">
<p class="topic-title first">Messaging Tutorial - At-least-once Processing</p>
<ul class="auto-toc simple">
<li><a class="reference internal" href="#at-least-once-processing" id="id1">1&nbsp;&nbsp;&nbsp;At-least-once Processing</a><ul class="auto-toc">
<li><a class="reference internal" href="#how-to-track-reading-positions" id="id2">1.1&nbsp;&nbsp;&nbsp;How to track reading positions</a><ul class="auto-toc">
<li><a class="reference internal" href="#open-the-offset-store" id="id3">1.1.1&nbsp;&nbsp;&nbsp;Open the offset store</a></li>
<li><a class="reference internal" href="#read-the-reader-read-position" id="id4">1.1.2&nbsp;&nbsp;&nbsp;Read the reader read position</a></li>
<li><a class="reference internal" href="#read-records" id="id5">1.1.3&nbsp;&nbsp;&nbsp;Read records</a></li>
<li><a class="reference internal" href="#track-read-position" id="id6">1.1.4&nbsp;&nbsp;&nbsp;Track read position</a></li>
<li><a class="reference internal" href="#record-read-position" id="id7">1.1.5&nbsp;&nbsp;&nbsp;Record read position</a></li>
</ul>
</li>
</ul>
</li>
</ul>
</div>
<div class="section" id="at-least-once-processing">
<h2><a class="toc-backref" href="#id1">1&nbsp;&nbsp;&nbsp;At-least-once Processing</a></h2>
<p>Applications typically choose between <cite>at-least-once</cite> and <cite>exactly-once</cite> processing semantics.
<cite>At-least-once</cite> processing guarantees that the application will process all the log records,
however when the application resumes after failure, previously processed records may be re-processed
if they have not been acknowledged. With at least once processing guarantees the application can store
reader positions in an external store and update it periodically. Upon restart the application will
reprocess messages since the last updated reader position.</p>
<p>This tutorial shows how to do <cite>at-least-once</cite> processing by using a <cite>offset-store</cite> to track the reading positions.</p>
<div class="section" id="how-to-track-reading-positions">
<h3><a class="toc-backref" href="#id2">1.1&nbsp;&nbsp;&nbsp;How to track reading positions</a></h3>
<p>Applications typically choose an external storage (e.g key/value storage) or another log stream to store their
read positions. In this example, we used a local key/value store - <cite>LevelDB</cite> to store the read positions.</p>
<div class="section" id="open-the-offset-store">
<h4><a class="toc-backref" href="#id3">1.1.1&nbsp;&nbsp;&nbsp;Open the offset store</a></h4>
<pre class="literal-block">
String offsetStoreFile = ...;
Options options = new Options();
options.createIfMissing(true);
DB offsetDB = factory.open(new File(offsetStoreFile), options);
</pre>
</div>
<div class="section" id="read-the-reader-read-position">
<h4><a class="toc-backref" href="#id4">1.1.2&nbsp;&nbsp;&nbsp;Read the reader read position</a></h4>
<p>Read the reader read position from the offset store.</p>
<pre class="literal-block">
byte[] offset = offsetDB.get(readerId.getBytes(UTF_8));
DLSN dlsn;
if (null == offset) {
dlsn = DLSN.InitialDLSN;
} else {
dlsn = DLSN.deserializeBytes(offset);
}
</pre>
</div>
<div class="section" id="read-records">
<h4><a class="toc-backref" href="#id5">1.1.3&nbsp;&nbsp;&nbsp;Read records</a></h4>
<p>Start read from the read position that recorded in offset store.</p>
<pre class="literal-block">
final AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(dlsn));
</pre>
</div>
<div class="section" id="track-read-position">
<h4><a class="toc-backref" href="#id6">1.1.4&nbsp;&nbsp;&nbsp;Track read position</a></h4>
<p>Track the last read position while reading using <cite>AtomicReference</cite>.</p>
<pre class="literal-block">
final AtomicReference&lt;DLSN&gt; lastReadDLSN = new AtomicReference&lt;DLSN&gt;(null);
reader.readNext().addEventListener(new FutureEventListener&lt;LogRecordWithDLSN&gt;() {
...
&#64;Override
public void onSuccess(LogRecordWithDLSN record) {
lastReadDLSN.set(record.getDlsn());
// process the record
...
// read next record
reader.readNext().addEventListener(this);
}
});
</pre>
</div>
<div class="section" id="record-read-position">
<h4><a class="toc-backref" href="#id7">1.1.5&nbsp;&nbsp;&nbsp;Record read position</a></h4>
<p>Periodically update the last read position to the offset store.</p>
<pre class="literal-block">
final ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(new Runnable() {
&#64;Override
public void run() {
if (null != lastDLSN.get()) {
offsetDB.put(readerId.getBytes(UTF_8), lastDLSN.get().serializeBytes());
}
}
}, 10, 10, TimeUnit.SECONDS);
</pre>
<p>Check <a class="reference external" href="https://github.com/apache/distributedlog/blob/master/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/ReaderWithOffsets.java">distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/ReaderWithOffsets</a> for more details.</p>
</div>
</div>
</div>
</div>
</div>
<hr>
<div class="row">
<div class="col-xs-12">
<footer>
<p class="text-center">&copy; Copyright 2016
<a href="http://www.apache.org">The Apache Software Foundation.</a> All Rights Reserved.
</p>
<p class="text-center">
<a href="/docs/0.4.0-incubating/feed.xml">RSS Feed</a>
</p>
</footer>
</div>
</div>
<!-- container div end -->
</div>
<script>
(function () {
'use strict';
anchors.options.placement = 'right';
anchors.add();
})();
</script>
</body>
</html>