blob: 688d907c066d7b04567afb16b5e98c91dbfd9605 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Tutorial - DistributedLog meets MapReduce</title>
<meta name="description" content="Apache DistributedLog is an high performance replicated log.
">
<link rel="stylesheet" href="/docs/0.4.0-incubating/styles/site.css">
<link rel="stylesheet" href="/docs/0.4.0-incubating/css/theme.css">
<!-- JQuery -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.0/jquery.min.js"></script>
<script src="/docs/0.4.0-incubating/js/bootstrap.min.js"></script>
<link rel="canonical" href="http://bookkeeper.apache.org/distributedlog/docs/0.4.0-incubating/tutorials/analytics-mapreduce.html" data-proofer-ignore>
<link rel="alternate" type="application/rss+xml" title="Apache DistributedLog (incubating)" href="http://bookkeeper.apache.org/distributedlog/docs/0.4.0-incubating/feed.xml">
<!-- Font Awesome -->
<script src="//cdnjs.cloudflare.com/ajax/libs/anchor-js/3.2.0/anchor.min.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','https://www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-83870961-1', 'auto');
ga('send', 'pageview');
</script>
<!-- End Google Analytics -->
<link rel="shortcut icon" type="image/x-icon" href="/images/favicon.ico">
</head>
<body role="document">
<nav class="navbar navbar-default navbar-fixed-top">
<div class="container">
<div class="navbar-header">
<a href="/" class="navbar-brand" >
<img alt="Brand" style="height: 28px" src="/docs/0.4.0-incubating/images/distributedlog_logo_navbar.png">
</a>
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#navbar" aria-expanded="false" aria-controls="navbar">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
</div>
<div id="navbar" class="navbar-collapse collapse">
<ul class="nav navbar-nav">
<!-- Overview -->
<li><a href="/docs/0.4.0-incubating/">V0.4.0</a></li>
<!-- Concepts -->
<li><a href="/docs/0.4.0-incubating/basics/introduction">Concepts</a></li>
<!-- Quick Start -->
<li>
<a href="/docs/0.4.0-incubating/start" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Start<span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li>
<a href="/docs/0.4.0-incubating/start/building.html">
Build DistributedLog from Source
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/start/download.html">
Download Releases
</a>
</li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Quickstart</strong></li>
<li>
<a href="/docs/0.4.0-incubating/start/quickstart.html">
Setup & Run Example
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/basic-1.html">
API - Write Records (via core library)
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/basic-2.html">
API - Write Records (via write proxy)
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/basic-5.html">
API - Read Records
</a>
</li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Deployment</strong></li>
<li>
<a href="/docs/0.4.0-incubating/deployment/cluster.html">
Cluster Setup
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/deployment/global-cluster.html">
Global Cluster Setup
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/deployment/docker.html">
Docker
</a>
</li>
</ul>
</li>
<!-- API -->
<li>
<a href="/docs/0.4.0-incubating/start" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">API<span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li><a href="/docs/0.4.0-incubating/api/java">Java</a></li>
</ul>
</li>
<!-- User Guide -->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">User Guide<span class="caret"></span></a>
<ul class="dropdown-menu">
<li>
<a href="/docs/0.4.0-incubating/basics/introduction.html">
Introduction
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/considerations/main.html">
Considerations
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/architecture/main.html">
Architecture
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/api/main.html">
API
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/configuration/main.html">
Configuration
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/design/main.html">
Detail Design
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/globalreplicatedlog/main.html">
Global Replicated Log
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/implementation/main.html">
Implementation
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/user_guide/references/main.html">
References
</a>
</li>
</ul>
</li>
<!-- Admin Guide -->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Admin Guide<span class="caret"></span></a>
<ul class="dropdown-menu">
<li><a href="/docs/0.4.0-incubating/deployment/cluster">Cluster Setup</a></li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/operations.html">
Operations
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/loadtest.html">
Load Test
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/performance.html">
Performance Tuning
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/hardware.html">
Hardware
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/monitoring.html">
Monitoring
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/zookeeper.html">
ZooKeeper
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/admin_guide/bookkeeper.html">
BookKeeper
</a>
</li>
</ul>
</li>
<!-- Tutorials -->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Tutorials<span class="caret"></span></a>
<ul class="dropdown-menu">
<li class="dropdown-header"><strong>Basic</strong></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-1">Write Records (via Core Library)</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-2">Write Records (via Write Proxy)</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-3">Write Records to multiple streams</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-4">Atomic Write Records</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-5">Tailing Read Records</a></li>
<li><a href="/docs/0.4.0-incubating/tutorials/basic-6">Rewind Read Records</a></li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Messaging</strong></li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-1.html">
Write records to partitioned streams
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-2.html">
Write records to multiple streams (load balancer)
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-3.html">
At-least-once Processing
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-4.html">
Exact-Once Processing
</a>
</li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/messaging-5.html">
Implement a kafka-like pub/sub system
</a>
</li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Replicated State Machines</strong></li>
<li>
<a href="/docs/0.4.0-incubating/tutorials/replicatedstatemachines.html">
Build replicated state machines
</a>
</li>
<li role="separator" class="divider"></li>
<li class="dropdown-header"><strong>Analytics</strong></li>
<li><a href="/docs/0.4.0-incubating/tutorials/analytics-mapreduce">Process log streams using MapReduce</a></li>
</ul>
</li>
</ul>
</div><!--/.nav-collapse -->
</div>
</nav>
<link rel="stylesheet" href="">
<div class="container" role="main">
<div class="row">
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<div class="col-md-8 col-md-offset-2">
<div class="contents topic" id="tutorial-distributedlog-meets-mapreduce">
<p class="topic-title first">Tutorial - DistributedLog meets MapReduce</p>
<ul class="simple">
<li><a class="reference internal" href="#distributedlog-meets-mapreduce" id="id1">DistributedLog meets MapReduce</a><ul>
<li><a class="reference internal" href="#inputformat" id="id2">InputFormat</a></li>
<li><a class="reference internal" href="#log-segment-vs-data-split" id="id3">Log Segment vs Data Split</a></li>
<li><a class="reference internal" href="#log-segment-record-reader" id="id4">Log Segment Record Reader</a></li>
</ul>
</li>
</ul>
</div>
<div class="section" id="distributedlog-meets-mapreduce">
<h2><a class="toc-backref" href="#id1">DistributedLog meets MapReduce</a></h2>
<p>A distributedlog log stream is consists of log segments. Each log segment is distributed
among multiple bookies node. This nature of data distribution allows distributedlog easily
integrated with any analytics processing systems like <em>MapReduce</em> and <em>Spark</em>. This tutorial
shows how you could use <em>MapReduce</em> to process log streams' data in batch and how <em>MapReduce</em>
can leverage the data locality of log segments.</p>
<div class="section" id="inputformat">
<h3><a class="toc-backref" href="#id2">InputFormat</a></h3>
<p><strong>InputFormat</strong> is one of the fundamental class in Hadoop MapReduce framework, that is used
for accessing data from different sources. The class is responsible for defining two main
things:</p>
<ul class="simple">
<li>Data Splits</li>
<li>Record Reader</li>
</ul>
<p><em>Data Split</em> is a fundamental concept in Hadoop MapReduce framework which defines both
the size of individual Map tasks and its potential execution server. The <em>Record Reader</em> is
responsible for actual reading records from the <em>data split</em> and submitting them (as key/value
pairs) to the mapper.</p>
<p>Using distributedlog log streams as the sources for a MapReduce job, the <em>log segments</em> are
the <em>data splits</em>, while the <em>log segment reader</em> for a log segment is the <em>record reader</em> for
a <em>data split</em>.</p>
</div>
<div class="section" id="log-segment-vs-data-split">
<h3><a class="toc-backref" href="#id3">Log Segment vs Data Split</a></h3>
<p>Any split implementation extends the Apache base abstract class - <strong>InputSplit</strong>, defining a
split length and locations. A distributedlog log segment has <em>record count</em>, which could be used
to define the length of the split, and its metadata contains the storage nodes that are used to
store its log records, which could be used to define the locations of the split. So we could
create a <strong>LogSegmentSplit</strong> wrapping over a <em>LogSegment</em> (LogSegmentMetadata and LedgerMetadata).</p>
<pre class="literal-block">
public class LogSegmentSplit extends InputSplit {
private LogSegmentMetadata logSegmentMetadata;
private LedgerMetadata ledgerMetadata;
public LogSegmentSplit() {}
public LogSegmentSplit(LogSegmentMetadata logSegmentMetadata,
LedgerMetadata ledgerMetadata) {
this.logSegmentMetadata = logSegmentMetadata;
this.ledgerMetadata = ledgerMetadata;
}
}
</pre>
<p>The length of the log segment split is the <em>number of records in the log segment</em>.</p>
<pre class="literal-block">
&#64;Override
public long getLength()
throws IOException, InterruptedException {
return logSegmentMetadata.getRecordCount();
}
</pre>
<p>The locations of the log segment split are the bookies' addresses in the ensembles of
the log segment.</p>
<pre class="literal-block">
&#64;Override
public String[] getLocations()
throws IOException, InterruptedException {
Set&lt;String&gt; locations = Sets.newHashSet();
for (ArrayList&lt;BookieSocketAddress&gt; ensemble : ledgerMetadata.getEnsembles().values()) {
for (BookieSocketAddress host : ensemble) {
locations.add(host.getHostName());
}
}
return locations.toArray(new String[locations.size()]);
}
</pre>
<p>At this point, we will have a basic <strong>LogSegmentSplit</strong> wrapping <em>LogSegmentMetadata</em> and
<em>LedgerMetadata</em>. Then we could retrieve the list of log segments of a log stream and construct
corresponding <em>data splits</em> in distributedlog inputformat.</p>
<pre class="literal-block">
public class DistributedLogInputFormat
extends InputFormat&lt;DLSN, LogRecordWithDLSN&gt; implements Configurable {
&#64;Override
public List&lt;InputSplit&gt; getSplits(JobContext jobContext)
throws IOException, InterruptedException {
List&lt;LogSegmentMetadata&gt; segments = dlm.getLogSegments();
List&lt;InputSplit&gt; inputSplits = Lists.newArrayListWithCapacity(segments.size());
BookKeeper bk = namespace.getReaderBKC().get();
LedgerManager lm = BookKeeperAccessor.getLedgerManager(bk);
final AtomicInteger rcHolder = new AtomicInteger(0);
final AtomicReference&lt;LedgerMetadata&gt; metadataHolder = new AtomicReference&lt;LedgerMetadata&gt;(null);
for (LogSegmentMetadata segment : segments) {
final CountDownLatch latch = new CountDownLatch(1);
lm.readLedgerMetadata(segment.getLedgerId(),
new BookkeeperInternalCallbacks.GenericCallback&lt;LedgerMetadata&gt;() {
&#64;Override
public void operationComplete(int rc, LedgerMetadata ledgerMetadata) {
metadataHolder.set(ledgerMetadata);
rcHolder.set(rc);
latch.countDown();
}
});
latch.await();
if (BKException.Code.OK != rcHolder.get()) {
throw new IOException(&quot;Faild to get log segment metadata for &quot; + segment + &quot; : &quot;
+ BKException.getMessage(rcHolder.get()));
}
inputSplits.add(new LogSegmentSplit(segment, metadataHolder.get()));
}
return inputSplits;
}
}
</pre>
</div>
<div class="section" id="log-segment-record-reader">
<h3><a class="toc-backref" href="#id4">Log Segment Record Reader</a></h3>
<p>At this point, we know how to break the log streams into <em>data splits</em>. Then we need to be able
to create a <strong>RecordReader</strong> for individual <em>data split</em>. Since each <em>data split</em> is effectively
a <em>log segment</em> in distributedlog, it is straight to implement it using distributedlog's log segment
reader. For simplicity, this example uses the raw bk api to access entries, which it doesn't
leverage features like <strong>ReadAhead</strong> provided in distributedlog. It could be changed to
use log segment reader for better performance.</p>
<p>From the <em>data split</em>, we know which log segment and its corresponding bookkeeper ledger. Then
we could open the ledger handle when initializing the record reader.</p>
<pre class="literal-block">
LogSegmentReader(String streamName,
DistributedLogConfiguration conf,
BookKeeper bk,
LogSegmentSplit split)
throws IOException {
this.streamName = streamName;
this.bk = bk;
this.metadata = split.getMetadata();
try {
this.lh = bk.openLedgerNoRecovery(
split.getLedgerId(),
BookKeeper.DigestType.CRC32,
conf.getBKDigestPW().getBytes(UTF_8));
} catch (BKException e) {
throw new IOException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}
</pre>
<p>Reading records from the <em>data split</em> is effectively reading records from the distributedlog
log segment.</p>
<pre class="literal-block">
try {
Enumeration&lt;LedgerEntry&gt; entries =
lh.readEntries(entryId, entryId);
if (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
Entry.newBuilder()
.setLogSegmentInfo(metadata.getLogSegmentSequenceNumber(),
metadata.getStartSequenceId())
.setEntryId(entry.getEntryId())
.setEnvelopeEntry(
LogSegmentMetadata.supportsEnvelopedEntries(metadata.getVersion()))
.deserializeRecordSet(true)
.setInputStream(entry.getEntryInputStream())
.buildReader();
}
return nextKeyValue();
} catch (BKException e) {
throw new IOException(e);
}
</pre>
<p>We could calculate the progress by comparing the position with the record count of this log segment.</p>
<pre class="literal-block">
&#64;Override
public float getProgress()
throws IOException, InterruptedException {
if (metadata.getRecordCount() &gt; 0) {
return ((float) (readPos + 1)) / metadata.getRecordCount();
}
return 1;
}
</pre>
<p>Once we have <em>LogSegmentSplit</em> and the <em>LogSegmentReader</em> over a split. We could hook them up to
implement distributedlog's InputFormat. Please check out the <a class="reference external" href="https://github.com/apache/distributedlog/tree/master/distributedlog-tutorials/distributedlog-mapreduce">code</a> for more details.</p>
</div>
</div>
</div>
</div>
<hr>
<div class="row">
<div class="col-xs-12">
<footer>
<p class="text-center">&copy; Copyright 2016
<a href="http://www.apache.org">The Apache Software Foundation.</a> All Rights Reserved.
</p>
<p class="text-center">
<a href="/docs/0.4.0-incubating/feed.xml">RSS Feed</a>
</p>
</footer>
</div>
</div>
<!-- container div end -->
</div>
<script>
(function () {
'use strict';
anchors.options.placement = 'right';
anchors.add();
})();
</script>
</body>
</html>