| <!DOCTYPE html> |
| |
| |
| <html xmlns="http://www.w3.org/1999/xhtml"> |
| <head> |
| <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> |
| |
| <title>DistributedLog meets MapReduce — DistributedLog 1.0 documentation</title> |
| |
| <link rel="stylesheet" href="../_static/override.css" type="text/css" /> |
| <link rel="stylesheet" href="../_static/pygments.css" type="text/css" /> |
| <link rel="stylesheet" href="../_static/bootstrap-3.1.0/css/bootstrap.min.css" type="text/css" /> |
| <link rel="stylesheet" href="../_static/bootstrap-3.1.0/css/bootstrap-theme.min.css" type="text/css" /> |
| <link rel="stylesheet" href="../_static/css/featherlight.min.css" type="text/css" /> |
| <link rel="stylesheet" href="../_static/css/docbird.css" type="text/css" /> |
| <link rel="stylesheet" href="../_static/css/docbird-xs.css" type="text/css" /> |
| <link rel="stylesheet" href="../_static/css/jquery.rateyo.min.css" type="text/css" /> |
| <link rel="stylesheet" href="../_static/css/selection-sharer.css" type="text/css" /> |
| |
| <script type="text/javascript"> |
| var DOCUMENTATION_OPTIONS = { |
| URL_ROOT: '../', |
| VERSION: '1.0', |
| COLLAPSE_INDEX: false, |
| FILE_SUFFIX: '.html', |
| HAS_SOURCE: true |
| }; |
| </script> |
| <script type="text/javascript" src="../_static/jquery.js"></script> |
| <script type="text/javascript" src="../_static/underscore.js"></script> |
| <script type="text/javascript" src="../_static/doctools.js"></script> |
| <script type="text/javascript" src="../_static/bootstrap-3.1.0/js/bootstrap.min.js"></script> |
| <script type="text/javascript" src="../_static/js/bootstrap-docbird.js"></script> |
| <script type="text/javascript" src="../_static/js/jquery-1.11.0.min.js"></script> |
| <script type="text/javascript" src="../_static/js/jquery-fix.js"></script> |
| <script type="text/javascript" src="../_static/js/featherlight.min.js"></script> |
| <script type="text/javascript" src="../_static/js/ifvisible.js"></script> |
| <script type="text/javascript" src="../_static/js/timeme.js"></script> |
| <script type="text/javascript" src="../_static/js/jquery.rateyo.min.js"></script> |
| <script type="text/javascript" src="../_static/js/js.cookie.js"></script> |
| <link rel="shortcut icon" href="../_static/docbird.ico"/> |
| <link rel="top" title="DistributedLog 1.0 documentation" href="../index.html" /> |
| <meta charset='utf-8'> |
| <meta http-equiv='X-UA-Compatible' content='IE=edge,chrome=1'> |
| <meta name='viewport' content='width=device-width, initial-scale=1.0, maximum-scale=1'> |
| <meta name="apple-mobile-web-app-capable" content="yes"> |
| |
| <meta property="docbird:project" content="DistributedLog" /> |
| |
| </head> |
| <body> |
| <div class="navbar navbar-default navbar-fixed-top" role="navigation"> |
| <div class="container-fluid"> |
| <div class="row db-header"> |
| <div class="col-sm-3 col-md-3 col-lg-3 hidden-xs db-header-controls"> |
| <a href="/" alt="Back to Docbird"> |
| <div class="db-home-button"> |
| <span class="glyphicon glyphicon-home"></span> |
| </div> |
| </a> |
| |
| <form action="../search.html" method="get" class="db-searchbox-form"> |
| <div class="form-group"> |
| <input type="text" name="q" class="form-control db-searchbox-input" placeholder="Search DistributedLog" /> |
| </div> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| |
| </div> |
| <div class="col-sm-7 col-md-7 col-lg-7 col-xs-12 db-header-info"> |
| <div class="visible-xs"> |
| <a href="/" alt="Back to Docbird"> |
| <div class="db-home-button"> |
| <span class="glyphicon glyphicon-home"></span> |
| </div> |
| </a> |
| </div> |
| <div class="visible-xs db-xs-menu-button"> |
| <div class="navbar-header"> |
| <button type="button" class="navbar-toggle" data-toggle="collapse" data-target="#db-xs-menu"> |
| <span class="sr-only">Toggle navigation</span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| </div> |
| </div> |
| <div class="db-header-projectname"> |
| <h1><a href="../index.html">DistributedLog</a></h1> |
| </div> |
| </div> |
| </div> |
| <div class="row db-xs-menu hidden-sm hidden-md hidden-lg |
| collapse" id="db-xs-menu"> |
| |
| <form action="../search.html" method="get" class="db-searchbox-form"> |
| <div class="form-group"> |
| <input type="text" name="q" class="form-control db-searchbox-input" placeholder="Search DistributedLog" /> |
| </div> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| |
| <div class="db-toc" role="complementary"> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../download.html">Releases</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../download.html#rc1">0.3.51-RC1</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../download.html#rc0">0.3.51-RC0</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../basics/main.html">Getting Started</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../basics/introduction.html">Introduction</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../basics/quickstart.html">Quick Start</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../api/main.html">API</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../api/core.html">Core Library API</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../api/proxy.html">Write Proxy Client API</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../api/practice.html">Best Practices</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../configuration/main.html">Configuration</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../configuration/core.html">Core Library Configuration</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../configuration/proxy.html">Write Proxy Configuration</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../configuration/client.html">Client Configuration</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../configuration/perlog.html">Per Stream Configuration</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../considerations/main.html">Considerations</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../considerations/main.html#consistency-durability-and-ordering">Consistency, Durability and Ordering</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../considerations/main.html#partitioning">Partitioning</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../considerations/main.html#processing-semantics">Processing Semantics</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../architecture/main.html">Architecture</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../architecture/main.html#data-model">Data Model</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../architecture/main.html#software-stack">Software Stack</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../architecture/main.html#lifecyle-of-records">Lifecyle of records</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../design/main.html">Detail Design</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../design/main.html#consistency">Consistency</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../design/main.html#streaming-reads">Streaming Reads</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../design/main.html#logsegment-lifecycle">LogSegment Lifecycle</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../globalreplicatedlog/main.html">Global Replicated Log</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../globalreplicatedlog/main.html#region-aware-data-placement-policy">Region Aware Data Placement Policy</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../globalreplicatedlog/main.html#cross-region-speculative-reads">Cross Region Speculative Reads</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../implementation/main.html">Implementation</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../implementation/storage.html">Storage</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../operations/main.html">Deployment & Administration</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../operations/deployment.html">Cluster Setup & Deployment</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../operations/operations.html">DistributedLog Operations</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../operations/performance.html">Performance Tuning</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../operations/hardware.html">Hardware</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../operations/monitoring.html">Monitoring</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../operations/zookeeper.html">ZooKeeper</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../operations/bookkeeper.html">BookKeeper</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../performance/main.html">Performance</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../references/main.html">References</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../references/configuration.html">Configuration Settings</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../references/metrics.html">Metrics</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../references/features.html">Features</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="main.html">Tutorials</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="main.html#basic">Basic</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="main.html#messaging">Messaging</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="main.html#replicated-state-machines">Replicated State Machines</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="main.html#analytics">Analytics</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../developer/main.html">Developer</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../developer/release.html">Release</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../faq.html">FAQ</a></li> |
| </ul> |
| |
| </div> |
| </div> |
| </div> |
| </div> |
| <div class="container"> |
| <div class="row"> |
| <div style="z-index: 1" class="col-xs-12 col-sm-12 col-md-12 col-lg-12"> |
| <style> |
| .overflow-container { |
| display: none; |
| } |
| .overflow-toggle { |
| text-decoration: none; |
| border-bottom: none; |
| border-radius: 4px; |
| border: 1px solid #eee; |
| padding: 1px 3px 3px; |
| color: #888; |
| font-weight: normal; |
| background-color: linen; |
| line-height: 1.85em; |
| cursor: pointer; |
| } |
| .overflow-toggle:hover { |
| color: #333; |
| border-color: #ccc; |
| background-color: beige; |
| } |
| </style> |
| <script> |
| $(function(){ |
| $('.overflow-toggle').on('click', function(){ |
| $(this).next().toggle(); |
| }); |
| }); |
| </script> |
| <div class="db-project-header-container"> |
| <div class="row"> |
| |
| <div class="db-project-info col-lg-12 col-md-12 col-sm-12 col-xs-12"> |
| <h1> |
| <a href="../index.html">DistributedLog</a> |
| |
| </h1> |
| |
| <div class="db-code-link"> |
| <a href="git@github.com:twitter/distributedlog.git/tree/master/" target="_blank">git@github.com:twitter/distributedlog.git/tree/master/</a> |
| </div> |
| |
| |
| </div> |
| </div> |
| |
| <div class="row db-project-links-row"> |
| <div class=" col-sm-3 col-md-3 col-lg-3 db-project-link-column"> |
| <div class="db-hashtag-container"> |
| |
| <span class="db-project-link-label">OWNERS</span> |
| |
| <em>None</em> |
| |
| |
| </div> |
| </div> |
| <div class="col-sm-3 col-md-3 col-lg-3 db-project-link-column"> |
| <div class="db-hashtag-container"> |
| |
| <span class="db-project-link-label">TAGS</span> |
| |
| <em><a class="db-hashtag" href="/?q=tags:%23uses_maven">#uses_maven</a></em> |
| |
| |
| </div> |
| </div> |
| <div class="col-sm-3 col-md-3 col-lg-3 db-project-link-column"> |
| <span class="db-project-link-label">HEALTH</span> |
| |
| <h3 style="margin-top: 0"> |
| <!-- <a href="/techdocs/checklist.html" class="label label-success">--> |
| <a href="/report/distributedlog" class=""> |
| 9.0 / 10 |
| <span style="margin-left: .25em" class="glyphicon glyphicon-ok"></span> |
| </a> |
| |
| </h3> |
| </div> |
| <div class="col-sm-3 col-md-3 col-lg-3 db-project-link-column"> |
| <span class="db-project-link-label">RATING</span> |
| <div id="rateYo"></div> |
| </div> |
| </div> |
| |
| </div> |
| </div> |
| <div class="col-xs-12 col-sm-8 col-md-8 col-lg-8"> |
| <div class="db-content-body"> |
| |
| <div class="section" id="distributedlog-meets-mapreduce"> |
| <h1>DistributedLog meets MapReduce<a class="headerlink" href="#distributedlog-meets-mapreduce" title="Permalink to this headline">¶</a></h1> |
| <p>A distributedlog log stream is consists of log segments. Each log |
| segment is distributed among multiple bookies node. This nature of data |
| distribution allows distributedlog easily integrated with any analytics |
| processing systems like <em>MapReduce</em> and <em>Spark</em>. This tutorial shows how |
| you could use <em>MapReduce</em> to process log streams' data in batch and how |
| <em>MapReduce</em> can leverage the data locality of log segments.</p> |
| <div class="section" id="inputformat"> |
| <h2>InputFormat<a class="headerlink" href="#inputformat" title="Permalink to this headline">¶</a></h2> |
| <p><strong>InputFormat</strong> is one of the fundamental class in Hadoop MapReduce |
| framework, that is used for accessing data from different sources. The |
| class is responsible for defining two main things:</p> |
| <ul class="simple"> |
| <li>Data Splits</li> |
| <li>Record Reader</li> |
| </ul> |
| <p><em>Data Split</em> is a fundamental concept in Hadoop MapReduce framework |
| which defines both the size of individual Map tasks and its potential |
| execution server. The <em>Record Reader</em> is responsible for actual reading |
| records from the <em>data split</em> and submitting them (as key/value pairs) |
| to the mapper.</p> |
| <p>Using distributedlog log streams as the sources for a MapReduce job, the |
| <em>log segments</em> are the <em>data splits</em>, while the <em>log segment reader</em> for |
| a log segment is the <em>record reader</em> for a <em>data split</em>.</p> |
| </div> |
| <div class="section" id="log-segment-vs-data-split"> |
| <h2>Log Segment vs Data Split<a class="headerlink" href="#log-segment-vs-data-split" title="Permalink to this headline">¶</a></h2> |
| <p>Any split implementation extends the Apache base abstract class - |
| <strong>InputSplit</strong>, defining a split length and locations. A distributedlog |
| log segment has <em>record count</em>, which could be used to define the length |
| of the split, and its metadata contains the storage nodes that are used |
| to store its log records, which could be used to define the locations of |
| the split. So we could create a <strong>LogSegmentSplit</strong> wrapping over a |
| <em>LogSegment</em> (LogSegmentMetadata and LedgerMetadata).</p> |
| <div class="highlight-python"><pre>public class LogSegmentSplit extends InputSplit { |
| |
| private LogSegmentMetadata logSegmentMetadata; |
| private LedgerMetadata ledgerMetadata; |
| |
| public LogSegmentSplit() {} |
| |
| public LogSegmentSplit(LogSegmentMetadata logSegmentMetadata, |
| LedgerMetadata ledgerMetadata) { |
| this.logSegmentMetadata = logSegmentMetadata; |
| this.ledgerMetadata = ledgerMetadata; |
| } |
| |
| }</pre> |
| <div style='display:none;' class='raw-code'><pre>public class LogSegmentSplit extends InputSplit { |
| |
| private LogSegmentMetadata logSegmentMetadata; |
| private LedgerMetadata ledgerMetadata; |
| |
| public LogSegmentSplit() {} |
| |
| public LogSegmentSplit(LogSegmentMetadata logSegmentMetadata, |
| LedgerMetadata ledgerMetadata) { |
| this.logSegmentMetadata = logSegmentMetadata; |
| this.ledgerMetadata = ledgerMetadata; |
| } |
| |
| }</pre> |
| </div></div> |
| <p>The length of the log segment split is the <em>number of records in the log |
| segment</em>.</p> |
| <div class="highlight-python"><pre>@Override |
| public long getLength() |
| throws IOException, InterruptedException { |
| return logSegmentMetadata.getRecordCount(); |
| }</pre> |
| <div style='display:none;' class='raw-code'><pre>@Override |
| public long getLength() |
| throws IOException, InterruptedException { |
| return logSegmentMetadata.getRecordCount(); |
| }</pre> |
| </div></div> |
| <p>The locations of the log segment split are the bookies' addresses in the |
| ensembles of the log segment.</p> |
| <div class="highlight-python"><pre>@Override |
| public String[] getLocations() |
| throws IOException, InterruptedException { |
| Set<String> locations = Sets.newHashSet(); |
| for (ArrayList<BookieSocketAddress> ensemble : ledgerMetadata.getEnsembles().values()) { |
| for (BookieSocketAddress host : ensemble) { |
| locations.add(host.getHostName()); |
| } |
| } |
| return locations.toArray(new String[locations.size()]); |
| }</pre> |
| <div style='display:none;' class='raw-code'><pre>@Override |
| public String[] getLocations() |
| throws IOException, InterruptedException { |
| Set<String> locations = Sets.newHashSet(); |
| for (ArrayList<BookieSocketAddress> ensemble : ledgerMetadata.getEnsembles().values()) { |
| for (BookieSocketAddress host : ensemble) { |
| locations.add(host.getHostName()); |
| } |
| } |
| return locations.toArray(new String[locations.size()]); |
| }</pre> |
| </div></div> |
| <p>At this point, we will have a basic <strong>LogSegmentSplit</strong> wrapping |
| <em>LogSegmentMetadata</em> and <em>LedgerMetadata</em>. Then we could retrieve the |
| list of log segments of a log stream and construct corresponding <em>data |
| splits</em> in distributedlog inputformat.</p> |
| <div class="highlight-python"><pre>public class DistributedLogInputFormat |
| extends InputFormat<DLSN, LogRecordWithDLSN> implements Configurable { |
| |
| @Override |
| public List<InputSplit> getSplits(JobContext jobContext) |
| throws IOException, InterruptedException { |
| List<LogSegmentMetadata> segments = dlm.getLogSegments(); |
| List<InputSplit> inputSplits = Lists.newArrayListWithCapacity(segments.size()); |
| BookKeeper bk = namespace.getReaderBKC().get(); |
| LedgerManager lm = BookKeeperAccessor.getLedgerManager(bk); |
| final AtomicInteger rcHolder = new AtomicInteger(0); |
| final AtomicReference<LedgerMetadata> metadataHolder = new AtomicReference<LedgerMetadata>(null); |
| for (LogSegmentMetadata segment : segments) { |
| final CountDownLatch latch = new CountDownLatch(1); |
| lm.readLedgerMetadata(segment.getLedgerId(), |
| new BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata>() { |
| @Override |
| public void operationComplete(int rc, LedgerMetadata ledgerMetadata) { |
| metadataHolder.set(ledgerMetadata); |
| rcHolder.set(rc); |
| latch.countDown(); |
| } |
| }); |
| latch.await(); |
| if (BKException.Code.OK != rcHolder.get()) { |
| throw new IOException("Faild to get log segment metadata for " + segment + " : " |
| + BKException.getMessage(rcHolder.get())); |
| } |
| inputSplits.add(new LogSegmentSplit(segment, metadataHolder.get())); |
| } |
| return inputSplits; |
| } |
| |
| }</pre> |
| <div style='display:none;' class='raw-code'><pre>public class DistributedLogInputFormat |
| extends InputFormat<DLSN, LogRecordWithDLSN> implements Configurable { |
| |
| @Override |
| public List<InputSplit> getSplits(JobContext jobContext) |
| throws IOException, InterruptedException { |
| List<LogSegmentMetadata> segments = dlm.getLogSegments(); |
| List<InputSplit> inputSplits = Lists.newArrayListWithCapacity(segments.size()); |
| BookKeeper bk = namespace.getReaderBKC().get(); |
| LedgerManager lm = BookKeeperAccessor.getLedgerManager(bk); |
| final AtomicInteger rcHolder = new AtomicInteger(0); |
| final AtomicReference<LedgerMetadata> metadataHolder = new AtomicReference<LedgerMetadata>(null); |
| for (LogSegmentMetadata segment : segments) { |
| final CountDownLatch latch = new CountDownLatch(1); |
| lm.readLedgerMetadata(segment.getLedgerId(), |
| new BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata>() { |
| @Override |
| public void operationComplete(int rc, LedgerMetadata ledgerMetadata) { |
| metadataHolder.set(ledgerMetadata); |
| rcHolder.set(rc); |
| latch.countDown(); |
| } |
| }); |
| latch.await(); |
| if (BKException.Code.OK != rcHolder.get()) { |
| throw new IOException("Faild to get log segment metadata for " + segment + " : " |
| + BKException.getMessage(rcHolder.get())); |
| } |
| inputSplits.add(new LogSegmentSplit(segment, metadataHolder.get())); |
| } |
| return inputSplits; |
| } |
| |
| }</pre> |
| </div></div> |
| </div> |
| <div class="section" id="log-segment-record-reader"> |
| <h2>Log Segment Record Reader<a class="headerlink" href="#log-segment-record-reader" title="Permalink to this headline">¶</a></h2> |
| <p>At this point, we know how to break the log streams into <em>data splits</em>. |
| Then we need to be able to create a <strong>RecordReader</strong> for individual |
| <em>data split</em>. Since each <em>data split</em> is effectively a <em>log segment</em> in |
| distributedlog, it is straight to implement it using distributedlog's |
| log segment reader. For simplicity, this example uses the raw bk api to |
| access entries, which it doesn't leverage features like <strong>ReadAhead</strong> |
| provided in distributedlog. It could be changed to use log segment |
| reader for better performance.</p> |
| <p>From the <em>data split</em>, we know which log segment and its corresponding |
| bookkeeper ledger. Then we could open the ledger handle when |
| initializing the record reader.</p> |
| <div class="highlight-python"><pre>LogSegmentReader(String streamName, |
| DistributedLogConfiguration conf, |
| BookKeeper bk, |
| LogSegmentSplit split) |
| throws IOException { |
| this.streamName = streamName; |
| this.bk = bk; |
| this.metadata = split.getMetadata(); |
| try { |
| this.lh = bk.openLedgerNoRecovery( |
| split.getLedgerId(), |
| BookKeeper.DigestType.CRC32, |
| conf.getBKDigestPW().getBytes(UTF_8)); |
| } catch (BKException e) { |
| throw new IOException(e); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOException(e); |
| } |
| }</pre> |
| <div style='display:none;' class='raw-code'><pre>LogSegmentReader(String streamName, |
| DistributedLogConfiguration conf, |
| BookKeeper bk, |
| LogSegmentSplit split) |
| throws IOException { |
| this.streamName = streamName; |
| this.bk = bk; |
| this.metadata = split.getMetadata(); |
| try { |
| this.lh = bk.openLedgerNoRecovery( |
| split.getLedgerId(), |
| BookKeeper.DigestType.CRC32, |
| conf.getBKDigestPW().getBytes(UTF_8)); |
| } catch (BKException e) { |
| throw new IOException(e); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOException(e); |
| } |
| }</pre> |
| </div></div> |
| <p>Reading records from the <em>data split</em> is effectively reading records |
| from the distributedlog log segment.</p> |
| <div class="highlight-python"><pre>try { |
| Enumeration<LedgerEntry> entries = |
| lh.readEntries(entryId, entryId); |
| if (entries.hasMoreElements()) { |
| LedgerEntry entry = entries.nextElement(); |
| Entry.newBuilder() |
| .setLogSegmentInfo(metadata.getLogSegmentSequenceNumber(), |
| metadata.getStartSequenceId()) |
| .setEntryId(entry.getEntryId()) |
| .setEnvelopeEntry( |
| LogSegmentMetadata.supportsEnvelopedEntries(metadata.getVersion())) |
| .deserializeRecordSet(true) |
| .setInputStream(entry.getEntryInputStream()) |
| .buildReader(); |
| } |
| return nextKeyValue(); |
| } catch (BKException e) { |
| throw new IOException(e); |
| }</pre> |
| <div style='display:none;' class='raw-code'><pre>try { |
| Enumeration<LedgerEntry> entries = |
| lh.readEntries(entryId, entryId); |
| if (entries.hasMoreElements()) { |
| LedgerEntry entry = entries.nextElement(); |
| Entry.newBuilder() |
| .setLogSegmentInfo(metadata.getLogSegmentSequenceNumber(), |
| metadata.getStartSequenceId()) |
| .setEntryId(entry.getEntryId()) |
| .setEnvelopeEntry( |
| LogSegmentMetadata.supportsEnvelopedEntries(metadata.getVersion())) |
| .deserializeRecordSet(true) |
| .setInputStream(entry.getEntryInputStream()) |
| .buildReader(); |
| } |
| return nextKeyValue(); |
| } catch (BKException e) { |
| throw new IOException(e); |
| }</pre> |
| </div></div> |
| <p>We could calculate the progress by comparing the position with the |
| record count of this log segment.</p> |
| <div class="highlight-python"><pre>@Override |
| public float getProgress() |
| throws IOException, InterruptedException { |
| if (metadata.getRecordCount() > 0) { |
| return ((float) (readPos + 1)) / metadata.getRecordCount(); |
| } |
| return 1; |
| }</pre> |
| <div style='display:none;' class='raw-code'><pre>@Override |
| public float getProgress() |
| throws IOException, InterruptedException { |
| if (metadata.getRecordCount() > 0) { |
| return ((float) (readPos + 1)) / metadata.getRecordCount(); |
| } |
| return 1; |
| }</pre> |
| </div></div> |
| <p>Once we have <em>LogSegmentSplit</em> and the <em>LogSegmentReader</em> over a split. |
| We could hook them up to implement distributedlog's InputFormat. Please |
| check out the code for more details.</p> |
| </div> |
| </div> |
| |
| |
| </div> |
| </div> |
| <div class="hidden-xs col-sm-3 col-md-3 col-md-offset-1 col-lg-3 db-sidebar"> |
| |
| <div class="db-toc" role="complementary"> |
| <ul class="current"> |
| <li class="toctree-l0 current"><a class="current reference internal" href="../index.html">DistributedLog</a></li> |
| </ul> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../download.html">Releases</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../download.html#rc1">0.3.51-RC1</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../download.html#rc0">0.3.51-RC0</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../basics/main.html">Getting Started</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../basics/introduction.html">Introduction</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../basics/quickstart.html">Quick Start</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../api/main.html">API</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../api/core.html">Core Library API</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../api/proxy.html">Write Proxy Client API</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../api/practice.html">Best Practices</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../configuration/main.html">Configuration</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../configuration/core.html">Core Library Configuration</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../configuration/proxy.html">Write Proxy Configuration</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../configuration/client.html">Client Configuration</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../configuration/perlog.html">Per Stream Configuration</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../considerations/main.html">Considerations</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../considerations/main.html#consistency-durability-and-ordering">Consistency, Durability and Ordering</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../considerations/main.html#partitioning">Partitioning</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../considerations/main.html#processing-semantics">Processing Semantics</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../architecture/main.html">Architecture</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../architecture/main.html#data-model">Data Model</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../architecture/main.html#software-stack">Software Stack</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../architecture/main.html#lifecyle-of-records">Lifecyle of records</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../design/main.html">Detail Design</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../design/main.html#consistency">Consistency</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../design/main.html#streaming-reads">Streaming Reads</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../design/main.html#logsegment-lifecycle">LogSegment Lifecycle</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../globalreplicatedlog/main.html">Global Replicated Log</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../globalreplicatedlog/main.html#region-aware-data-placement-policy">Region Aware Data Placement Policy</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../globalreplicatedlog/main.html#cross-region-speculative-reads">Cross Region Speculative Reads</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../implementation/main.html">Implementation</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../implementation/storage.html">Storage</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../operations/main.html">Deployment & Administration</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../operations/deployment.html">Cluster Setup & Deployment</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../operations/operations.html">DistributedLog Operations</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../operations/performance.html">Performance Tuning</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../operations/hardware.html">Hardware</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../operations/monitoring.html">Monitoring</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../operations/zookeeper.html">ZooKeeper</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../operations/bookkeeper.html">BookKeeper</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../performance/main.html">Performance</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../references/main.html">References</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../references/configuration.html">Configuration Settings</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../references/metrics.html">Metrics</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="../references/features.html">Features</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="main.html">Tutorials</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="main.html#basic">Basic</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="main.html#messaging">Messaging</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="main.html#replicated-state-machines">Replicated State Machines</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="main.html#analytics">Analytics</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../developer/main.html">Developer</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="../developer/release.html">Release</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="../faq.html">FAQ</a></li> |
| </ul> |
| |
| <span id="last"></span> |
| </div> |
| |
| </div> |
| <!-- <div id="slidebox"> --> |
| <!-- <button id="slidebox_close" type="button" class="close">×</button> --> |
| <!-- <p>Rate This Page</p> --> |
| <!-- <div id="rateYo"></div> --> |
| <!-- <p>Comment</p> |
| <input type="text" name="comment"></input> |
| <button>Submit</button> --> |
| <!-- </div> --> |
| </div> |
| </div> |
| <footer class="footer"> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-10 col-md-offset-1"> |
| <p class="pull-right"> |
| <a href="#">Back to top</a> |
| |
| <br/> |
| |
| <div id="sourcelink"> |
| <a href="git@github.com:twitter/distributedlog.git/tree/master/docs/tutorials/analytics-mapreduce.rst" |
| rel="nofollow">Source</a> |
| |
| <a href="../_sources/tutorials/analytics-mapreduce.txt" |
| rel="nofollow">Raw</a> |
| <a href="../__docbird-build.log" |
| rel="nofollow">Build Log</a> |
| <a href="/report/stats/distributedlog:distributedlog" |
| rel="nofollow">Stats</a> |
| </div> |
| </p> |
| <p> |
| Built and hosted by <a href="/">DocBird</a>. |
| </p> |
| </div> |
| </div> |
| </div> |
| </footer> |
| <script type="text/javascript" src="../_static/js/docbird.js"></script> |
| <script type="text/javascript"> |
| var _gaq = _gaq || []; |
| _gaq.push(['_setAccount', 'UA-30775-8']); |
| _gaq.push(['_trackPageview']); |
| |
| (function() { |
| var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true; |
| ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js'; |
| var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s); |
| })(); |
| </script> |
| <!-- <script type="text/javascript" src="//s/d41d8cd98f00b204e9800998ecf8427e/en_US-tbnx1s-1988229788/6163/97/1.4.3/_/download/batch/com.atlassian.jira.collector.plugin.jira-issue-collector-plugin:issuecollector/com.atlassian.jira.collector.plugin.jira-issue-collector-plugin:issuecollector.js?collectorId=e62237fc"></script> |
| --> |
| |
| <script type="text/javascript"> |
| $(document).ready(function () { |
| // track user activity time (from https://github.com/jasonzissman/TimeMe.js) |
| TimeMe.setIdleDurationInSeconds(30); |
| TimeMe.setCurrentPageName("my-home-page"); |
| TimeMe.initialize(); |
| |
| // record page visit event when user leaves the page |
| window.onbeforeunload = function (event) { |
| xmlhttp=new XMLHttpRequest(); |
| xmlhttp.withCredentials = true; |
| xmlhttp.open("POST", "/event/distributedlog:distributedlog/visit", false); |
| xmlhttp.setRequestHeader("Content-type", "application/x-www-form-urlencoded"); |
| var event_data = { |
| total_time_reading: TimeMe.getTimeOnCurrentPageInSeconds(), |
| page: window.location.href |
| }; |
| //alert("send: " + $.param(event_data)); |
| xmlhttp.send($.param(event_data)); |
| }; |
| |
| // ask user for page rating after 20 seconds |
| // setTimeout(function(){ |
| // alert("Rate this page!"); |
| // }, 20000); |
| }); |
| </script> |
| <!-- <style> |
| #slidebox{ |
| width: 250px; |
| height: 90px; |
| padding: 10px; |
| background-color: #fff; |
| border: 1px solid #ccc; |
| position: fixed; |
| bottom: 3px; |
| right: -280px; |
| z-index: 1; |
| } |
| #slidebox .close{ |
| margin-top: -5px; |
| opacity: 0.5; |
| } |
| #slidebox .close:hover{ |
| opacity: 0.7; |
| } |
| </style> --> |
| <script type="text/javascript"> |
| $(function() { |
| // $(window).scroll(function(){ |
| // var distanceTop = $('#last').offset().top - $(window).height(); |
| |
| // if ($(window).scrollTop() > distanceTop) |
| // $('#slidebox').animate({'right':'3px'},300); |
| // else |
| // $('#slidebox').stop(true).animate({'right':'-280px'},100); |
| // }); |
| |
| // $('#slidebox .close').bind('click',function(){ |
| // $(this).parent().remove(); |
| // }); |
| |
| $("#rateYo").rateYo({ |
| normalFill: "#A0A0A0", |
| halfStar: true, |
| rating: (Cookies.get('docbird.rating.distributedlog.distributedlog') || 0.0) |
| }).on("rateyo.set", function (e, data) { |
| var event_data = { |
| comment: '', // see todo note below |
| rating: data.rating, |
| page: window.location.href |
| }; |
| Cookies.get('docbird.rating.distributedlog.distributedlog', data.rating) |
| $.post('/event/distributedlog:distributedlog/rating', event_data) |
| // xmlhttp=new XMLHttpRequest(); |
| // xmlhttp.withCredentials = true; |
| // var event_data = { |
| // comment: '', // see todo note below |
| // rating: data.rating, |
| // page: window.location.href |
| // }; |
| // xmlhttp.open("GET", "/event/distributedlog/rating?" + $.param(event_data), false); |
| // xmlhttp.setRequestHeader("Content-type", "application/x-www-form-urlencoded"); |
| // // todo: implement comment form in rating slide out, |
| // // and instead of hooking this event, include a submit button, |
| // // and read the rating with rating() method |
| |
| // // alert("send: " + $.param(event_data)); |
| // xmlhttp.send(); |
| |
| }); |
| |
| }); |
| </script> |
| <script src="_static/js/selection-sharer.js"></script> |
| <script> |
| $('.db-content-body').selectionSharer(); |
| </script> |
| </body> |
| </html> |