| <!DOCTYPE html> |
| <html lang="en"> |
| |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| |
| <title>Tutorial - DistributedLog meets MapReduce</title> |
| <meta name="description" content="Apache DistributedLog is an high performance replicated log. |
| "> |
| |
| <link rel="stylesheet" href="/docs/0.4.0-incubating/styles/site.css"> |
| <link rel="stylesheet" href="/docs/0.4.0-incubating/css/theme.css"> |
| <!-- JQuery --> |
| <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.0/jquery.min.js"></script> |
| <script src="/docs/0.4.0-incubating/js/bootstrap.min.js"></script> |
| <link rel="canonical" href="http://bookkeeper.apache.org/distributedlog/docs/0.4.0-incubating/tutorials/analytics-mapreduce.html" data-proofer-ignore> |
| <link rel="alternate" type="application/rss+xml" title="Apache DistributedLog (incubating)" href="http://bookkeeper.apache.org/distributedlog/docs/0.4.0-incubating/feed.xml"> |
| <!-- Font Awesome --> |
| <script src="//cdnjs.cloudflare.com/ajax/libs/anchor-js/3.2.0/anchor.min.js"></script> |
| <!-- Google Analytics --> |
| <script> |
| (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){ |
| (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o), |
| m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m) |
| })(window,document,'script','https://www.google-analytics.com/analytics.js','ga'); |
| |
| ga('create', 'UA-83870961-1', 'auto'); |
| ga('send', 'pageview'); |
| </script> |
| <!-- End Google Analytics --> |
| <link rel="shortcut icon" type="image/x-icon" href="/images/favicon.ico"> |
| </head> |
| |
| |
| <body role="document"> |
| |
| |
| <nav class="navbar navbar-default navbar-fixed-top"> |
| <div class="container"> |
| <div class="navbar-header"> |
| <a href="/" class="navbar-brand" > |
| <img alt="Brand" style="height: 28px" src="/docs/0.4.0-incubating/images/distributedlog_logo_navbar.png"> |
| </a> |
| <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#navbar" aria-expanded="false" aria-controls="navbar"> |
| <span class="sr-only">Toggle navigation</span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| </div> |
| <div id="navbar" class="navbar-collapse collapse"> |
| <ul class="nav navbar-nav"> |
| <!-- Overview --> |
| <li><a href="/docs/0.4.0-incubating/">V0.4.0</a></li> |
| <!-- Concepts --> |
| <li><a href="/docs/0.4.0-incubating/basics/introduction">Concepts</a></li> |
| <!-- Quick Start --> |
| <li> |
| <a href="/docs/0.4.0-incubating/start" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Start<span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/start/building.html"> |
| Build DistributedLog from Source |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/start/download.html"> |
| Download Releases |
| </a> |
| </li> |
| |
| <li role="separator" class="divider"></li> |
| <li class="dropdown-header"><strong>Quickstart</strong></li> |
| |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/start/quickstart.html"> |
| Setup & Run Example |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/tutorials/basic-1.html"> |
| API - Write Records (via core library) |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/tutorials/basic-2.html"> |
| API - Write Records (via write proxy) |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/tutorials/basic-5.html"> |
| API - Read Records |
| </a> |
| </li> |
| |
| <li role="separator" class="divider"></li> |
| <li class="dropdown-header"><strong>Deployment</strong></li> |
| |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/deployment/cluster.html"> |
| Cluster Setup |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/deployment/global-cluster.html"> |
| Global Cluster Setup |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/deployment/docker.html"> |
| Docker |
| </a> |
| </li> |
| |
| </ul> |
| </li> |
| <!-- API --> |
| <li> |
| <a href="/docs/0.4.0-incubating/start" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">API<span class="caret"></span></a> |
| <ul class="dropdown-menu" role="menu"> |
| <li><a href="/docs/0.4.0-incubating/api/java">Java</a></li> |
| </ul> |
| </li> |
| <!-- User Guide --> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">User Guide<span class="caret"></span></a> |
| <ul class="dropdown-menu"> |
| |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/basics/introduction.html"> |
| Introduction |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/user_guide/considerations/main.html"> |
| Considerations |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/user_guide/architecture/main.html"> |
| Architecture |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/user_guide/api/main.html"> |
| API |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/user_guide/configuration/main.html"> |
| Configuration |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/user_guide/design/main.html"> |
| Detail Design |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/user_guide/globalreplicatedlog/main.html"> |
| Global Replicated Log |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/user_guide/implementation/main.html"> |
| Implementation |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/user_guide/references/main.html"> |
| References |
| </a> |
| </li> |
| |
| </ul> |
| </li> |
| <!-- Admin Guide --> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Admin Guide<span class="caret"></span></a> |
| <ul class="dropdown-menu"> |
| <li><a href="/docs/0.4.0-incubating/deployment/cluster">Cluster Setup</a></li> |
| |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/admin_guide/operations.html"> |
| Operations |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/admin_guide/loadtest.html"> |
| Load Test |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/admin_guide/performance.html"> |
| Performance Tuning |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/admin_guide/hardware.html"> |
| Hardware |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/admin_guide/monitoring.html"> |
| Monitoring |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/admin_guide/zookeeper.html"> |
| ZooKeeper |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/admin_guide/bookkeeper.html"> |
| BookKeeper |
| </a> |
| </li> |
| |
| </ul> |
| </li> |
| <!-- Tutorials --> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Tutorials<span class="caret"></span></a> |
| <ul class="dropdown-menu"> |
| <li class="dropdown-header"><strong>Basic</strong></li> |
| <li><a href="/docs/0.4.0-incubating/tutorials/basic-1">Write Records (via Core Library)</a></li> |
| <li><a href="/docs/0.4.0-incubating/tutorials/basic-2">Write Records (via Write Proxy)</a></li> |
| <li><a href="/docs/0.4.0-incubating/tutorials/basic-3">Write Records to multiple streams</a></li> |
| <li><a href="/docs/0.4.0-incubating/tutorials/basic-4">Atomic Write Records</a></li> |
| <li><a href="/docs/0.4.0-incubating/tutorials/basic-5">Tailing Read Records</a></li> |
| <li><a href="/docs/0.4.0-incubating/tutorials/basic-6">Rewind Read Records</a></li> |
| <li role="separator" class="divider"></li> |
| <li class="dropdown-header"><strong>Messaging</strong></li> |
| |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/tutorials/messaging-1.html"> |
| Write records to partitioned streams |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/tutorials/messaging-2.html"> |
| Write records to multiple streams (load balancer) |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/tutorials/messaging-3.html"> |
| At-least-once Processing |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/tutorials/messaging-4.html"> |
| Exact-Once Processing |
| </a> |
| </li> |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/tutorials/messaging-5.html"> |
| Implement a kafka-like pub/sub system |
| </a> |
| </li> |
| |
| <li role="separator" class="divider"></li> |
| <li class="dropdown-header"><strong>Replicated State Machines</strong></li> |
| |
| |
| <li> |
| <a href="/docs/0.4.0-incubating/tutorials/replicatedstatemachines.html"> |
| Build replicated state machines |
| </a> |
| </li> |
| |
| <li role="separator" class="divider"></li> |
| <li class="dropdown-header"><strong>Analytics</strong></li> |
| <li><a href="/docs/0.4.0-incubating/tutorials/analytics-mapreduce">Process log streams using MapReduce</a></li> |
| </ul> |
| </li> |
| </ul> |
| </div><!--/.nav-collapse --> |
| </div> |
| </nav> |
| |
| |
| <link rel="stylesheet" href=""> |
| |
| |
| <div class="container" role="main"> |
| |
| <div class="row"> |
| |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| <div class="col-md-8 col-md-offset-2"> |
| <div class="contents topic" id="tutorial-distributedlog-meets-mapreduce"> |
| <p class="topic-title first">Tutorial - DistributedLog meets MapReduce</p> |
| <ul class="simple"> |
| <li><a class="reference internal" href="#distributedlog-meets-mapreduce" id="id1">DistributedLog meets MapReduce</a><ul> |
| <li><a class="reference internal" href="#inputformat" id="id2">InputFormat</a></li> |
| <li><a class="reference internal" href="#log-segment-vs-data-split" id="id3">Log Segment vs Data Split</a></li> |
| <li><a class="reference internal" href="#log-segment-record-reader" id="id4">Log Segment Record Reader</a></li> |
| </ul> |
| </li> |
| </ul> |
| </div> |
| <div class="section" id="distributedlog-meets-mapreduce"> |
| <h2><a class="toc-backref" href="#id1">DistributedLog meets MapReduce</a></h2> |
| <p>A distributedlog log stream is consists of log segments. Each log segment is distributed |
| among multiple bookies node. This nature of data distribution allows distributedlog easily |
| integrated with any analytics processing systems like <em>MapReduce</em> and <em>Spark</em>. This tutorial |
| shows how you could use <em>MapReduce</em> to process log streams' data in batch and how <em>MapReduce</em> |
| can leverage the data locality of log segments.</p> |
| <div class="section" id="inputformat"> |
| <h3><a class="toc-backref" href="#id2">InputFormat</a></h3> |
| <p><strong>InputFormat</strong> is one of the fundamental class in Hadoop MapReduce framework, that is used |
| for accessing data from different sources. The class is responsible for defining two main |
| things:</p> |
| <ul class="simple"> |
| <li>Data Splits</li> |
| <li>Record Reader</li> |
| </ul> |
| <p><em>Data Split</em> is a fundamental concept in Hadoop MapReduce framework which defines both |
| the size of individual Map tasks and its potential execution server. The <em>Record Reader</em> is |
| responsible for actual reading records from the <em>data split</em> and submitting them (as key/value |
| pairs) to the mapper.</p> |
| <p>Using distributedlog log streams as the sources for a MapReduce job, the <em>log segments</em> are |
| the <em>data splits</em>, while the <em>log segment reader</em> for a log segment is the <em>record reader</em> for |
| a <em>data split</em>.</p> |
| </div> |
| <div class="section" id="log-segment-vs-data-split"> |
| <h3><a class="toc-backref" href="#id3">Log Segment vs Data Split</a></h3> |
| <p>Any split implementation extends the Apache base abstract class - <strong>InputSplit</strong>, defining a |
| split length and locations. A distributedlog log segment has <em>record count</em>, which could be used |
| to define the length of the split, and its metadata contains the storage nodes that are used to |
| store its log records, which could be used to define the locations of the split. So we could |
| create a <strong>LogSegmentSplit</strong> wrapping over a <em>LogSegment</em> (LogSegmentMetadata and LedgerMetadata).</p> |
| <pre class="literal-block"> |
| public class LogSegmentSplit extends InputSplit { |
| |
| private LogSegmentMetadata logSegmentMetadata; |
| private LedgerMetadata ledgerMetadata; |
| |
| public LogSegmentSplit() {} |
| |
| public LogSegmentSplit(LogSegmentMetadata logSegmentMetadata, |
| LedgerMetadata ledgerMetadata) { |
| this.logSegmentMetadata = logSegmentMetadata; |
| this.ledgerMetadata = ledgerMetadata; |
| } |
| |
| } |
| </pre> |
| <p>The length of the log segment split is the <em>number of records in the log segment</em>.</p> |
| <pre class="literal-block"> |
| @Override |
| public long getLength() |
| throws IOException, InterruptedException { |
| return logSegmentMetadata.getRecordCount(); |
| } |
| </pre> |
| <p>The locations of the log segment split are the bookies' addresses in the ensembles of |
| the log segment.</p> |
| <pre class="literal-block"> |
| @Override |
| public String[] getLocations() |
| throws IOException, InterruptedException { |
| Set<String> locations = Sets.newHashSet(); |
| for (ArrayList<BookieSocketAddress> ensemble : ledgerMetadata.getEnsembles().values()) { |
| for (BookieSocketAddress host : ensemble) { |
| locations.add(host.getHostName()); |
| } |
| } |
| return locations.toArray(new String[locations.size()]); |
| } |
| </pre> |
| <p>At this point, we will have a basic <strong>LogSegmentSplit</strong> wrapping <em>LogSegmentMetadata</em> and |
| <em>LedgerMetadata</em>. Then we could retrieve the list of log segments of a log stream and construct |
| corresponding <em>data splits</em> in distributedlog inputformat.</p> |
| <pre class="literal-block"> |
| public class DistributedLogInputFormat |
| extends InputFormat<DLSN, LogRecordWithDLSN> implements Configurable { |
| |
| @Override |
| public List<InputSplit> getSplits(JobContext jobContext) |
| throws IOException, InterruptedException { |
| List<LogSegmentMetadata> segments = dlm.getLogSegments(); |
| List<InputSplit> inputSplits = Lists.newArrayListWithCapacity(segments.size()); |
| BookKeeper bk = namespace.getReaderBKC().get(); |
| LedgerManager lm = BookKeeperAccessor.getLedgerManager(bk); |
| final AtomicInteger rcHolder = new AtomicInteger(0); |
| final AtomicReference<LedgerMetadata> metadataHolder = new AtomicReference<LedgerMetadata>(null); |
| for (LogSegmentMetadata segment : segments) { |
| final CountDownLatch latch = new CountDownLatch(1); |
| lm.readLedgerMetadata(segment.getLedgerId(), |
| new BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata>() { |
| @Override |
| public void operationComplete(int rc, LedgerMetadata ledgerMetadata) { |
| metadataHolder.set(ledgerMetadata); |
| rcHolder.set(rc); |
| latch.countDown(); |
| } |
| }); |
| latch.await(); |
| if (BKException.Code.OK != rcHolder.get()) { |
| throw new IOException("Faild to get log segment metadata for " + segment + " : " |
| + BKException.getMessage(rcHolder.get())); |
| } |
| inputSplits.add(new LogSegmentSplit(segment, metadataHolder.get())); |
| } |
| return inputSplits; |
| } |
| |
| } |
| </pre> |
| </div> |
| <div class="section" id="log-segment-record-reader"> |
| <h3><a class="toc-backref" href="#id4">Log Segment Record Reader</a></h3> |
| <p>At this point, we know how to break the log streams into <em>data splits</em>. Then we need to be able |
| to create a <strong>RecordReader</strong> for individual <em>data split</em>. Since each <em>data split</em> is effectively |
| a <em>log segment</em> in distributedlog, it is straight to implement it using distributedlog's log segment |
| reader. For simplicity, this example uses the raw bk api to access entries, which it doesn't |
| leverage features like <strong>ReadAhead</strong> provided in distributedlog. It could be changed to |
| use log segment reader for better performance.</p> |
| <p>From the <em>data split</em>, we know which log segment and its corresponding bookkeeper ledger. Then |
| we could open the ledger handle when initializing the record reader.</p> |
| <pre class="literal-block"> |
| LogSegmentReader(String streamName, |
| DistributedLogConfiguration conf, |
| BookKeeper bk, |
| LogSegmentSplit split) |
| throws IOException { |
| this.streamName = streamName; |
| this.bk = bk; |
| this.metadata = split.getMetadata(); |
| try { |
| this.lh = bk.openLedgerNoRecovery( |
| split.getLedgerId(), |
| BookKeeper.DigestType.CRC32, |
| conf.getBKDigestPW().getBytes(UTF_8)); |
| } catch (BKException e) { |
| throw new IOException(e); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOException(e); |
| } |
| } |
| </pre> |
| <p>Reading records from the <em>data split</em> is effectively reading records from the distributedlog |
| log segment.</p> |
| <pre class="literal-block"> |
| try { |
| Enumeration<LedgerEntry> entries = |
| lh.readEntries(entryId, entryId); |
| if (entries.hasMoreElements()) { |
| LedgerEntry entry = entries.nextElement(); |
| Entry.newBuilder() |
| .setLogSegmentInfo(metadata.getLogSegmentSequenceNumber(), |
| metadata.getStartSequenceId()) |
| .setEntryId(entry.getEntryId()) |
| .setEnvelopeEntry( |
| LogSegmentMetadata.supportsEnvelopedEntries(metadata.getVersion())) |
| .deserializeRecordSet(true) |
| .setInputStream(entry.getEntryInputStream()) |
| .buildReader(); |
| } |
| return nextKeyValue(); |
| } catch (BKException e) { |
| throw new IOException(e); |
| } |
| </pre> |
| <p>We could calculate the progress by comparing the position with the record count of this log segment.</p> |
| <pre class="literal-block"> |
| @Override |
| public float getProgress() |
| throws IOException, InterruptedException { |
| if (metadata.getRecordCount() > 0) { |
| return ((float) (readPos + 1)) / metadata.getRecordCount(); |
| } |
| return 1; |
| } |
| </pre> |
| <p>Once we have <em>LogSegmentSplit</em> and the <em>LogSegmentReader</em> over a split. We could hook them up to |
| implement distributedlog's InputFormat. Please check out the <a class="reference external" href="https://github.com/apache/distributedlog/tree/master/distributedlog-tutorials/distributedlog-mapreduce">code</a> for more details.</p> |
| </div> |
| </div> |
| |
| |
| </div> |
| |
| |
| |
| </div> |
| |
| |
| <hr> |
| <div class="row"> |
| <div class="col-xs-12"> |
| <footer> |
| <p class="text-center">© Copyright 2016 |
| <a href="http://www.apache.org">The Apache Software Foundation.</a> All Rights Reserved. |
| </p> |
| <p class="text-center"> |
| <a href="/docs/0.4.0-incubating/feed.xml">RSS Feed</a> |
| </p> |
| </footer> |
| </div> |
| </div> |
| <!-- container div end --> |
| </div> |
| |
| |
| <script> |
| (function () { |
| 'use strict'; |
| anchors.options.placement = 'right'; |
| anchors.add(); |
| })(); |
| </script> |
| |
| </body> |
| |
| </html> |