blob: 5b64ecff6369e7d6fddc0534c77fc63a3b5a6e9c [file] [log] [blame]
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<!--
| Generated by Apache Maven Doxia at 2021-06-15
| Rendered using Apache Maven Stylus Skin 1.5
-->
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<title>Apache Hadoop Amazon Web Services support &#x2013; S3A Committers: Architecture and Implementation</title>
<style type="text/css" media="all">
@import url("../../css/maven-base.css");
@import url("../../css/maven-theme.css");
@import url("../../css/site.css");
</style>
<link rel="stylesheet" href="../../css/print.css" type="text/css" media="print" />
<meta name="Date-Revision-yyyymmdd" content="20210615" />
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
</head>
<body class="composite">
<div id="banner">
<a href="http://hadoop.apache.org/" id="bannerLeft">
<img src="http://hadoop.apache.org/images/hadoop-logo.jpg" alt="" />
</a>
<a href="http://www.apache.org/" id="bannerRight">
<img src="http://www.apache.org/images/asf_logo_wide.png" alt="" />
</a>
<div class="clear">
<hr/>
</div>
</div>
<div id="breadcrumbs">
<div class="xleft">
<a href="http://www.apache.org/" class="externalLink">Apache</a>
&gt;
<a href="http://hadoop.apache.org/" class="externalLink">Hadoop</a>
&gt;
<a href="../../index.html">Apache Hadoop Amazon Web Services support</a>
&gt;
S3A Committers: Architecture and Implementation
</div>
<div class="xright"> <a href="http://wiki.apache.org/hadoop" class="externalLink">Wiki</a>
|
<a href="https://gitbox.apache.org/repos/asf/hadoop.git" class="externalLink">git</a>
&nbsp;| Last Published: 2021-06-15
&nbsp;| Version: 3.3.1
</div>
<div class="clear">
<hr/>
</div>
</div>
<div id="leftColumn">
<div id="navcolumn">
<h5>General</h5>
<ul>
<li class="none">
<a href="../../../index.html">Overview</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/SingleCluster.html">Single Node Setup</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/ClusterSetup.html">Cluster Setup</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/CommandsManual.html">Commands Reference</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/FileSystemShell.html">FileSystem Shell</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/Compatibility.html">Compatibility Specification</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/DownstreamDev.html">Downstream Developer's Guide</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/AdminCompatibilityGuide.html">Admin Compatibility Guide</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/InterfaceClassification.html">Interface Classification</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/filesystem/index.html">FileSystem Specification</a>
</li>
</ul>
<h5>Common</h5>
<ul>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/CLIMiniCluster.html">CLI Mini Cluster</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/FairCallQueue.html">Fair Call Queue</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/NativeLibraries.html">Native Libraries</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/Superusers.html">Proxy User</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/RackAwareness.html">Rack Awareness</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/SecureMode.html">Secure Mode</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/ServiceLevelAuth.html">Service Level Authorization</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/HttpAuthentication.html">HTTP Authentication</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/CredentialProviderAPI.html">Credential Provider API</a>
</li>
<li class="none">
<a href="../../../hadoop-kms/index.html">Hadoop KMS</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/Tracing.html">Tracing</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/UnixShellGuide.html">Unix Shell Guide</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/registry/index.html">Registry</a>
</li>
</ul>
<h5>HDFS</h5>
<ul>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsDesign.html">Architecture</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html">User Guide</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HDFSCommands.html">Commands Reference</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html">NameNode HA With QJM</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithNFS.html">NameNode HA With NFS</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html">Observer NameNode</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/Federation.html">Federation</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/ViewFs.html">ViewFs</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/ViewFsOverloadScheme.html">ViewFsOverloadScheme</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsSnapshots.html">Snapshots</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsEditsViewer.html">Edits Viewer</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsImageViewer.html">Image Viewer</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsPermissionsGuide.html">Permissions and HDFS</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsQuotaAdminGuide.html">Quotas and HDFS</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/LibHdfs.html">libhdfs (C API)</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/WebHDFS.html">WebHDFS (REST API)</a>
</li>
<li class="none">
<a href="../../../hadoop-hdfs-httpfs/index.html">HttpFS</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/ShortCircuitLocalReads.html">Short Circuit Local Reads</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html">Centralized Cache Management</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsNfsGateway.html">NFS Gateway</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsRollingUpgrade.html">Rolling Upgrade</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/ExtendedAttributes.html">Extended Attributes</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html">Transparent Encryption</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsMultihoming.html">Multihoming</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html">Storage Policies</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/MemoryStorage.html">Memory Storage Support</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/SLGUserGuide.html">Synthetic Load Generator</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HDFSErasureCoding.html">Erasure Coding</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HDFSDiskbalancer.html">Disk Balancer</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsUpgradeDomain.html">Upgrade Domain</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsDataNodeAdminGuide.html">DataNode Admin</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs-rbf/HDFSRouterFederation.html">Router Federation</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsProvidedStorage.html">Provided Storage</a>
</li>
</ul>
<h5>MapReduce</h5>
<ul>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html">Tutorial</a>
</li>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredCommands.html">Commands Reference</a>
</li>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduce_Compatibility_Hadoop1_Hadoop2.html">Compatibility with 1.x</a>
</li>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html">Encrypted Shuffle</a>
</li>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/PluggableShuffleAndPluggableSort.html">Pluggable Shuffle/Sort</a>
</li>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/DistributedCacheDeploy.html">Distributed Cache Deploy</a>
</li>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/SharedCacheSupport.html">Support for YARN Shared Cache</a>
</li>
</ul>
<h5>MapReduce REST APIs</h5>
<ul>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredAppMasterRest.html">MR Application Master</a>
</li>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-hs/HistoryServerRest.html">MR History Server</a>
</li>
</ul>
<h5>YARN</h5>
<ul>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/YARN.html">Architecture</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/YarnCommands.html">Commands Reference</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html">Capacity Scheduler</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/FairScheduler.html">Fair Scheduler</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/ResourceManagerRestart.html">ResourceManager Restart</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/ResourceManagerHA.html">ResourceManager HA</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/ResourceModel.html">Resource Model</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/NodeLabel.html">Node Labels</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/NodeAttributes.html">Node Attributes</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/WebApplicationProxy.html">Web Application Proxy</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/TimelineServer.html">Timeline Server</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/TimelineServiceV2.html">Timeline Service V.2</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html">Writing YARN Applications</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html">YARN Application Security</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/NodeManager.html">NodeManager</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/DockerContainers.html">Running Applications in Docker Containers</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/RuncContainers.html">Running Applications in runC Containers</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/NodeManagerCgroups.html">Using CGroups</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/SecureContainer.html">Secure Containers</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/ReservationSystem.html">Reservation System</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/GracefulDecommission.html">Graceful Decommission</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/OpportunisticContainers.html">Opportunistic Containers</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/Federation.html">YARN Federation</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/SharedCache.html">Shared Cache</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/UsingGpus.html">Using GPU</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/UsingFPGA.html">Using FPGA</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/PlacementConstraints.html">Placement Constraints</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/YarnUI2.html">YARN UI2</a>
</li>
</ul>
<h5>YARN REST APIs</h5>
<ul>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/WebServicesIntro.html">Introduction</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html">Resource Manager</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/NodeManagerRest.html">Node Manager</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/TimelineServer.html#Timeline_Server_REST_API_v1">Timeline Server</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/TimelineServiceV2.html#Timeline_Service_v.2_REST_API">Timeline Service V.2</a>
</li>
</ul>
<h5>YARN Service</h5>
<ul>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/yarn-service/Overview.html">Overview</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/yarn-service/QuickStart.html">QuickStart</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/yarn-service/Concepts.html">Concepts</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/yarn-service/YarnServiceAPI.html">Yarn Service API</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/yarn-service/ServiceDiscovery.html">Service Discovery</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/yarn-service/SystemServices.html">System Services</a>
</li>
</ul>
<h5>Hadoop Compatible File Systems</h5>
<ul>
<li class="none">
<a href="../../../hadoop-aliyun/tools/hadoop-aliyun/index.html">Aliyun OSS</a>
</li>
<li class="none">
<a href="../../../hadoop-aws/tools/hadoop-aws/index.html">Amazon S3</a>
</li>
<li class="none">
<a href="../../../hadoop-azure/index.html">Azure Blob Storage</a>
</li>
<li class="none">
<a href="../../../hadoop-azure-datalake/index.html">Azure Data Lake Storage</a>
</li>
<li class="none">
<a href="../../../hadoop-openstack/index.html">OpenStack Swift</a>
</li>
<li class="none">
<a href="../../../hadoop-cos/cloud-storage/index.html">Tencent COS</a>
</li>
</ul>
<h5>Auth</h5>
<ul>
<li class="none">
<a href="../../../hadoop-auth/index.html">Overview</a>
</li>
<li class="none">
<a href="../../../hadoop-auth/Examples.html">Examples</a>
</li>
<li class="none">
<a href="../../../hadoop-auth/Configuration.html">Configuration</a>
</li>
<li class="none">
<a href="../../../hadoop-auth/BuildingIt.html">Building</a>
</li>
</ul>
<h5>Tools</h5>
<ul>
<li class="none">
<a href="../../../hadoop-streaming/HadoopStreaming.html">Hadoop Streaming</a>
</li>
<li class="none">
<a href="../../../hadoop-archives/HadoopArchives.html">Hadoop Archives</a>
</li>
<li class="none">
<a href="../../../hadoop-archive-logs/HadoopArchiveLogs.html">Hadoop Archive Logs</a>
</li>
<li class="none">
<a href="../../../hadoop-distcp/DistCp.html">DistCp</a>
</li>
<li class="none">
<a href="../../../hadoop-gridmix/GridMix.html">GridMix</a>
</li>
<li class="none">
<a href="../../../hadoop-rumen/Rumen.html">Rumen</a>
</li>
<li class="none">
<a href="../../../hadoop-resourceestimator/ResourceEstimator.html">Resource Estimator Service</a>
</li>
<li class="none">
<a href="../../../hadoop-sls/SchedulerLoadSimulator.html">Scheduler Load Simulator</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/Benchmarking.html">Hadoop Benchmarking</a>
</li>
<li class="none">
<a href="../../../hadoop-dynamometer/Dynamometer.html">Dynamometer</a>
</li>
</ul>
<h5>Reference</h5>
<ul>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/release/">Changelog and Release Notes</a>
</li>
<li class="none">
<a href="../../../api/index.html">Java API docs</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/UnixShellAPI.html">Unix Shell API</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/Metrics.html">Metrics</a>
</li>
</ul>
<h5>Configuration</h5>
<ul>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/core-default.xml">core-default.xml</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/hdfs-default.xml">hdfs-default.xml</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs-rbf/hdfs-rbf-default.xml">hdfs-rbf-default.xml</a>
</li>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml">mapred-default.xml</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-common/yarn-default.xml">yarn-default.xml</a>
</li>
<li class="none">
<a href="../../../hadoop-kms/kms-default.html">kms-default.xml</a>
</li>
<li class="none">
<a href="../../../hadoop-hdfs-httpfs/httpfs-default.html">httpfs-default.xml</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/DeprecatedProperties.html">Deprecated Properties</a>
</li>
</ul>
<a href="http://maven.apache.org/" title="Built by Maven" class="poweredBy">
<img alt="Built by Maven" src="../../images/logos/maven-feather.png"/>
</a>
</div>
</div>
<div id="bodyColumn">
<div id="contentBox">
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<h1>S3A Committers: Architecture and Implementation</h1><!-- DISABLEDMACRO{toc|fromDepth=0|toDepth=5} -->
<p>This document covers the architecture and implementation details of the S3A committers.</p>
<p>For information on using the committers, see <a href="./committer.html">the S3A Committers</a>.</p>
<div class="section">
<div class="section">
<h3><a name="January_2021_Update"></a>January 2021 Update</h3>
<p>Now that S3 is fully consistent, problems related to inconsistent directory listings have gone. However the rename problem exists: committing work by renaming directories is unsafe as well as horribly slow.</p>
<p>This architecture document, and the committers, were written at a time when S3 was inconsistent. The two committers addressed this problem differently</p>
<ul>
<li>Staging Committer: rely on a cluster HDFS filesystem for safely propagating the lists of files to commit from workers to the job manager/driver.</li>
<li>Magic Committer: require S3Guard to offer consistent directory listings on the object store.</li>
</ul>
<p>With consistent S3, the Magic Committer can be safely used with any S3 bucket. The choice of which to use, then, is matter for experimentation.</p>
<p>This architecture document was written in 2017, a time when S3 was only consistent when an extra consistency layer such as S3Guard was used. The document indicates where requirements/constraints which existed then are now obsolete.</p></div></div>
<div class="section">
<h2><a name="Problem:_Efficient.2C_reliable_commits_of_work_to_consistent_S3_buckets"></a>Problem: Efficient, reliable commits of work to consistent S3 buckets</h2>
<p>The standard commit algorithms (the <tt>FileOutputCommitter</tt> and its v1 and v2 algorithms) rely on directory rename being an <tt>O(1)</tt> atomic operation: callers output their work to temporary directories in the destination filesystem, then rename these directories to the final destination as way of committing work. This is the perfect solution for committing work against any filesystem with consistent listing operations and where the <tt>FileSystem.rename()</tt> command is an atomic <tt>O(1)</tt> operation.</p>
<p>Using rename allows individual tasks to work in temporary directories, with the rename as the atomic operation can be used to explicitly commit tasks and ultimately the entire job. Because the cost of the rename is low, it can be performed during task and job commits with minimal delays. Note that HDFS will lock the namenode metadata during the rename operation, so all rename() calls will be serialized. However, as they only update the metadata of two directory entries, the duration of the lock is low.</p>
<p>In contrast to a &#x201c;real&#x201d; filesystem, Amazon&#x2019;s S3A object store, similar to most others, does not support <tt>rename()</tt> at all. A hash operation on the filename determines the location of of the data &#x2014;there is no separate metadata to change. To mimic renaming, the Hadoop S3A client has to copy the data to a new object with the destination filename, then delete the original entry. This copy can be executed server-side, but as it does not complete until the in-cluster copy has completed, it takes time proportional to the amount of data.</p>
<p>The rename overhead is the most visible issue, but it is not the most dangerous. That is the fact that until late 2020, path listings had no consistency guarantees, and may have lagged the addition or deletion of files. If files were not listed, the commit operation would <i>not</i> copy them, and so they would not appear in the final output.</p>
<p>The solution to this problem is closely coupled to the S3 protocol itself: delayed completion of multi-part PUT operations</p>
<p>That is: tasks write all data as multipart uploads, <i>but delay the final commit action until until the final, single job commit action.</i> Only that data committed in the job commit action will be made visible; work from speculative and failed tasks will not be instantiated. As there is no rename, there is no delay while data is copied from a temporary directory to the final directory. The duration of the commit will be the time needed to determine which commit operations to construct, and to execute them.</p></div>
<div class="section">
<h2><a name="Terminology"></a>Terminology</h2>
<ul>
<li><i>Job</i>: a potentially parallelized query/operation to execute. The execution of a job: the division of work into tasks and the management of their completion, is generally executed in a single process.</li>
</ul>
<p>The output of a Job is made visible to other stages in a larger operation sequence or other applications if the job <i>completes successfully</i>.</p>
<ul>
<li>
<p><i>Job Driver</i>. Not sure quite what term to use here. Whatever process schedules task execution, tracks success/failures and, determines when all the work has been processed and then commits the output. It may also determine that a job has failed and cannot be recovered, in which case the job is aborted. In MR and Tez, this is inside the YARN application master. In Spark it is the driver, which can run in the AM, the YARN client, or other places (e.g Livy?).</p>
</li>
<li>
<p><i>Final directory</i>: the directory into which the output of a job is placed so as to be visible.</p>
</li>
<li>
<p><i>Task</i> a single operation within a job, on a single process, one which generates one or more files. After a successful job completion, the data MUST be visible in the final directory. A task completes successfully if it generates all the output it expects to without failing in some way (error in processing; network/process failure).</p>
</li>
<li>
<p><i>Job Context</i> an instance of the class <tt>org.apache.hadoop.mapreduce.JobContext</tt>, which provides a read-only view of the Job for the Job Driver and tasks.</p>
</li>
<li>
<p><i>Task Attempt Context</i> an instance of the class <tt>org.apache.hadoop.mapreduce.TaskAttemptContext extends JobContext, Progressable</tt>, which provides operations for tasks, such as getting and setting status, progress and counter values.</p>
</li>
<li>
<p><i>Task Working Directory</i>: a directory for exclusive access by a single task, into which uncommitted work may be placed.</p>
</li>
<li>
<p><i>Task Commit</i> The act of taking the output of a task, as found in the Task Working Directory, and making it visible in the final directory. This is traditionally implemented via a <tt>FileSystem.rename()</tt> call.</p>
</li>
</ul>
<p>It is useful to differentiate between a <i>task-side commit</i>: an operation performed in the task process after its work, and a <i>driver-side task commit</i>, in which the Job driver performs the commit operation. Any task-side commit work will be performed across the cluster, and may take place off the critical part for job execution. However, unless the commit protocol requires all tasks to await a signal from the job driver, task-side commits cannot instantiate their output in the final directory. They may be used to promote the output of a successful task into a state ready for the job commit, addressing speculative execution and failures.</p>
<ul>
<li>
<p><i>Job Commit</i> The act of taking all successfully completed tasks of a job, and committing them. This process is generally non-atomic; as it is often a serialized operation at the end of a job, its performance can be a bottleneck.</p>
</li>
<li>
<p><i>Task Abort</i> To cancel a task such that its data is not committed.</p>
</li>
<li>
<p><i>Job Abort</i> To cancel all work in a job: no task&#x2019;s work is committed.</p>
</li>
<li>
<p><i>Speculative Task Execution/ &#x201c;Speculation&#x201d;</i> Running multiple tasks against the same input dataset in parallel, with the first task which completes being the one which is considered successful. Its output SHALL be committed; the other task SHALL be aborted. There&#x2019;s a requirement that a task can be executed in parallel, and that the output of a task MUST NOT BE visible until the job is committed, at the behest of the Job driver. There is the expectation that the output SHOULD BE the same on each task, though that MAY NOT be the case. What matters is if any instance of a speculative task is committed, the output MUST BE considered valid.</p>
</li>
</ul>
<p>There is an expectation that the Job Driver and tasks can communicate: if a task performs any operations itself during the task commit phase, it shall only do this when instructed by the Job Driver. Similarly, if a task is unable to communicate its final status to the Job Driver, it MUST NOT commit is work. This is very important when working with S3, as some network partitions could isolate a task from the Job Driver, while the task retains access to S3.</p></div>
<div class="section">
<h2><a name="The_execution_workflow"></a>The execution workflow</h2>
<p><b>setup</b>:</p>
<ul>
<li>A job is created, assigned a Job ID (YARN?).</li>
<li>For each attempt, an attempt ID is created, to build the job attempt ID.</li>
<li><tt>Driver</tt>: a <tt>JobContext</tt> is created/configured</li>
<li>A committer instance is instantiated with the <tt>JobContext</tt>; <tt>setupJob()</tt> invoked.</li>
</ul></div>
<div class="section">
<h2><a name="The_FileOutputCommitter"></a>The <tt>FileOutputCommitter</tt></h2>
<p>The standard commit protocols are implemented in <tt>org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter</tt>.</p>
<p>There are two algorithms, the &#x201c;v1&#x201d; designed to address failures and restarts of the MapReduce application master. The V2 algorithm cannot recover from failure except by re-executing the entire job. It does, however, propagate all its work to the output directory in the task commit. When working with object stores which mimic <tt>rename()</tt> by copy and delete, it is more efficient due to the reduced number of listings, copies and deletes, and, because these are executed in task commits, eliminates the final <tt>O(data)</tt> pause at the end of all work. It is still highly inefficient, but the inefficiencies are less visible as large pauses in execution.</p>
<p>Notes</p>
<ul>
<li>The v1 algorithm was implemented to handle MapReduce AM restarts, it was not used in Hadoop 1.x, whose JobTracker could not recover from failures. Historically then, it is the second version of a file commit algorithm.</li>
<li>Because the renames are considered to be fast, there is no logging of renames being in progress, or their duration.</li>
<li>Speculative execution is supported by having every task attempt write its uncommitted data to a task attempt directory. When a task is ready to commit, it must notify that job driver that it is ready to commit -the job driver will then commit or abort the task.</li>
</ul></div>
<div class="section">
<h2><a name="Hadoop_MR_Commit_algorithm_.E2.80.9C1.E2.80.9D"></a>Hadoop MR Commit algorithm &#x201c;1&#x201d;</h2>
<p>The &#x201c;v1&#x201d; commit algorithm is the default commit algorithm in Hadoop 2.x; it was implemented as part of <a class="externalLink" href="https://issues.apache.org/jira/browse/MAPREDUCE-2702">MAPREDUCE-2702</a>.</p>
<p>This algorithm is designed to handle a failure and restart of the Job driver, with the restarted job driver only rerunning the incomplete tasks; the output of the completed tasks is recovered for commitment when the restarted job completes.</p>
<p>There is a cost: the time to commit by recursive listing all files in all committed task directories, and renaming this.</p>
<p>As this is performed sequentially, time to commit is <tt>O(files)</tt>, which is generally <tt>O(tasks)</tt>.</p>
<div>
<div>
<pre class="source">#Job Attempt Path is `$dest/_temporary/$appAttemptId/`
jobAttemptPath = '$dest/_temporary/$appAttemptId/'
# Task Attempt Path is `$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID`
taskAttemptPath = '$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID'
#Task committed path is `$dest/_temporary/$appAttemptId/$taskAttemptID`
taskCommittedPath = '$dest/_temporary/$appAttemptId/$taskAttemptID'
</pre></div></div>
<p>Tasks write in/under the task attempt path.</p>
<div class="section">
<h3><a name="Job_Setup"></a>Job Setup</h3>
<div>
<div>
<pre class="source">fs.mkdir(jobAttemptPath)
</pre></div></div>
</div>
<div class="section">
<h3><a name="Task_Setup"></a>Task Setup</h3>
<p>None: directories are created on demand.</p></div>
<div class="section">
<h3><a name="Task_Commit"></a>Task Commit</h3>
<p>Rename task attempt path to task committed path.</p>
<div>
<div>
<pre class="source">def needsTaskCommit(fs, jobAttemptPath, taskAttemptPath, dest):
return fs.exists(taskAttemptPath)
def commitTask(fs, jobAttemptPath, taskAttemptPath, dest):
if fs.exists(taskAttemptPath) :
fs.delete(taskCommittedPath, recursive=True)
fs.rename(taskAttemptPath, taskCommittedPath)
</pre></div></div>
<p>On a genuine filesystem this is an <tt>O(1)</tt> directory rename.</p>
<p>On an object store with a mimicked rename, it is <tt>O(data)</tt> for the copy, along with overhead for listing and deleting all files (For S3, that&#x2019;s <tt>(1 + files/500)</tt> lists, and the same number of delete calls.</p></div>
<div class="section">
<h3><a name="Task_Abort"></a>Task Abort</h3>
<p>Delete task attempt path.</p>
<div>
<div>
<pre class="source">def abortTask(fs, jobAttemptPath, taskAttemptPath, dest):
fs.delete(taskAttemptPath, recursive=True)
</pre></div></div>
<p>On a genuine filesystem this is an <tt>O(1)</tt> operation. On an object store, proportional to the time to list and delete files, usually in batches.</p></div>
<div class="section">
<h3><a name="Job_Commit"></a>Job Commit</h3>
<p>Merge all files/directories in all task committed paths into final destination path. Optionally; create 0-byte <tt>_SUCCESS</tt> file in destination path.</p>
<div>
<div>
<pre class="source">def commitJob(fs, jobAttemptDir, dest):
for committedTask in fs.listFiles(jobAttemptDir):
mergePathsV1(fs, committedTask, dest)
fs.touch(&quot;$dest/_SUCCESS&quot;)
</pre></div></div>
<p>(See below for details on <tt>mergePaths()</tt>)</p>
<p>A failure during job commit cannot be recovered from except by re-executing the entire query:</p>
<div>
<div>
<pre class="source">def isCommitJobRepeatable() :
return False
</pre></div></div>
<p>Accordingly, it is a failure point in the protocol. With a low number of files and fast rename/list algorithms, the window of vulnerability is low. At scale, the vulnerability increases. It could actually be reduced through parallel execution of the renaming of of committed tasks.</p></div>
<div class="section">
<h3><a name="Job_Abort"></a>Job Abort</h3>
<p>Delete all data under job attempt path.</p>
<div>
<div>
<pre class="source">def abortJob(fs, jobAttemptDir, dest):
fs.delete(jobAttemptDir, recursive = True)
</pre></div></div>
</div>
<div class="section">
<h3><a name="Job_Cleanup"></a>Job Cleanup</h3>
<div>
<div>
<pre class="source">def cleanupJob(fs, dest):
fs.delete('$dest/_temporary', recursive = True)
</pre></div></div>
</div>
<div class="section">
<h3><a name="Job_Recovery_Before_commitJob.28.29"></a>Job Recovery Before <tt>commitJob()</tt></h3>
<p>For all committers, the recovery process takes place in the application master. 1. The job history file of the previous attempt is loaded and scanned to determine which tasks were recorded as having succeeded. 1. For each successful task, the job committer has its <tt>recoverTask()</tt> method invoked with a <tt>TaskAttemptContext</tt> built from the previous attempt&#x2019;s details. 1. If the method does not raise an exception, it is considered to have been recovered, and not to be re-executed. 1. All other tasks are queued for execution.</p>
<p>For the v1 committer, task recovery is straightforward. The directory of the committed task from the previous attempt is moved under the directory of the current application attempt.</p>
<div>
<div>
<pre class="source">def recoverTask(tac):
oldAttemptId = appAttemptId - 1
fs.rename('$dest/_temporary/oldAttemptId/${tac.taskId}',
'$dest/_temporary/appAttemptId/${tac.taskId}')
</pre></div></div>
<p>This significantly improves time to recover from Job driver (here MR AM) failure. The only lost work is that of all tasks in progress -those which had generated data but were not yet committed.</p>
<p>Only the failure of the job driver requires a job restart, not an individual task. Therefore the probability of this happening is independent of the number of tasks executed in parallel, instead simply due to the duration of the query.</p>
<p>The longer the task, the higher the risk of failure, the more value there is in recovering the work in progress.</p>
<p>Fast queries not only have a lower risk of failure, they can recover from failure simply by rerunning the entire job. This is implicitly the strategy in Spark, which does not attempt to recover any in-progress jobs. The faster your queries, the simpler your recovery strategy needs to be.</p></div>
<div class="section">
<h3><a name="Job_Recovery_During_commitJob.28.29"></a>Job Recovery During <tt>commitJob()</tt></h3>
<p>This is not possible; a failure during job commit requires the entire job to be re-executed after cleaning up the destination directory.</p></div>
<div class="section">
<h3><a name="mergePaths.28FileSystem_fs.2C_FileStatus_src.2C_Path_dest.29_Algorithm"></a><tt>mergePaths(FileSystem fs, FileStatus src, Path dest)</tt> Algorithm</h3>
<p><tt>mergePaths()</tt> is the core algorithm to merge data; it is somewhat confusing as the implementation mixes the strategies for both algorithms across two co-recursive routines, <tt>mergePaths()</tt> and <tt>renameOrMerge()</tt>.</p>
<p>Here the two algorithms have been split, and one of the co-recursive methods inlined.</p>
<div>
<div>
<pre class="source">def mergePathsV1(fs, src, dest) :
if fs.exists(dest) :
toStat = fs.getFileStatus(dest)
else:
toStat = None
if src.isFile :
if not toStat is None :
fs.delete(dest, recursive = True)
fs.rename(src.getPath, dest)
else :
# src is directory, choose action on dest type
if not toStat is None :
if not toStat.isDirectory :
# Destination exists and is not a directory
fs.delete(dest)
fs.rename(src.getPath(), dest)
else :
# Destination exists and is a directory
# merge all children under destination directory
for child in fs.listStatus(src.getPath) :
mergePathsV1(fs, child, dest + child.getName)
else :
# destination does not exist
fs.rename(src.getPath(), dest)
</pre></div></div>
</div></div>
<div class="section">
<h2><a name="The_v2_Commit_Algorithm"></a>The v2 Commit Algorithm</h2>
<p>The v2 algorithm directly commits task output into the destination directory. It is essentially a re-implementation of the Hadoop 1.x commit algorithm.</p>
<ol style="list-style-type: decimal">
<li>During execution, intermediate data becomes visible.</li>
<li>On a failure, all output must be deleted and the job restarted.</li>
</ol>
<p>It implements <tt>mergePaths</tt> differently, as shown below.</p>
<div>
<div>
<pre class="source">def mergePathsV2(fs, src, dest) :
if fs.exists(dest) :
toStat = fs.getFileStatus(dest)
else:
toStat = None
if src.isFile :
if not toStat is None :
fs.delete(dest, recursive = True)
fs.rename(src.getPath, dest)
else :
# destination is directory, choose action on source type
if src.isDirectory :
if not toStat is None :
if not toStat.isDirectory :
# Destination exists and is not a directory
fs.delete(dest)
fs.mkdirs(dest) #
for child in fs.listStatus(src.getPath) : # HERE
mergePathsV2(fs, child, dest + child.getName) #
else :
# Destination exists and is a directory
# merge all children under destination directory
for child in fs.listStatus(src.getPath) :
mergePathsV2(fs, child, dest + child.getName)
else :
# destination does not exist
fs.mkdirs(dest) #
for child in fs.listStatus(src.getPath) : # HERE
mergePathsV2(fs, child, dest + child.getName) #
</pre></div></div>
<p>Both recurse down any source directory tree, and commit single files by renaming the files.</p>
<p>A a key difference is that the v1 algorithm commits a source directory to via a directory rename, which is traditionally an <tt>O(1)</tt> operation.</p>
<p>In contrast, the v2 algorithm lists all direct children of a source directory and recursively calls <tt>mergePath()</tt> on them, ultimately renaming the individual files. As such, the number of renames it performs equals the number of source <i>files</i>, rather than the number of source <i>directories</i>; the number of directory listings being <tt>O(depth(src))</tt> , where <tt>depth(path)</tt> is a function returning the depth of directories under the given path.</p>
<p>On a normal filesystem, the v2 merge algorithm is potentially more expensive than the v1 algorithm. However, as the merging only takes place in task commit, it is potentially less of a bottleneck in the entire execution process.</p>
<p>On an object store, it is suboptimal not just from its expectation that <tt>rename()</tt> is an <tt>O(1)</tt> operation, but from its expectation that a recursive tree walk is an efficient way to enumerate and act on a tree of data. If the algorithm was switched to using <tt>FileSystem.listFiles(path, recursive)</tt> for a single call to enumerate all children under a path, then the listing operation would be significantly faster, at least on a deep or wide tree. However, for any realistic dataset, the size of the output files is likely to be the main cause of delays. That is, if the cost of <tt>mergePathsV2</tt> is <tt>O(depth(src)) + O(data))</tt>, then generally the <tt>O(data)</tt> value will be more significant than the <tt>depth(src)</tt>.</p>
<p>There is one key exception: tests which work on small amounts of data yet try to generate realistic output directory structures. In these tests the cost of listing directories and calling <tt>getFileStatus()</tt> could exceed that of the copy calls. This is why small-scale tests of the commit algorithms against object stores must be considered significantly misleading.</p>
<div class="section">
<h3><a name="v2_Task_Commit"></a>v2 Task Commit</h3>
<p>Rename task attempt path to task committed path.</p>
<div>
<div>
<pre class="source">def needsTaskCommit(fs, jobAttemptPath, taskAttemptPath, dest):
return fs.exists(taskAttemptPath)
def commitTask(fs, jobAttemptPath, taskAttemptPath, dest):
if fs.exists(taskAttemptPath) :
mergePathsV2(fs, taskAttemptPath, dest)
</pre></div></div>
</div>
<div class="section">
<h3><a name="v2_Task_Abort"></a>v2 Task Abort</h3>
<p>Delete task attempt path.</p>
<div>
<div>
<pre class="source">def abortTask(fs, jobAttemptPath, taskAttemptPath, dest):
fs.delete(taskAttemptPath, recursive=True)
</pre></div></div>
<p>Cost: <tt>O(1)</tt> for normal filesystems, <tt>O(files)</tt> for object stores.</p></div>
<div class="section">
<h3><a name="v2_Job_Commit"></a>v2 Job Commit</h3>
<p>As all the task output is already completed, all that is needed is to touch the <tt>_SUCCESS</tt> marker.</p>
<div>
<div>
<pre class="source">def commitJob(fs, jobAttemptDir, dest):
fs.touch(&quot;$dest/_SUCCESS&quot;)
</pre></div></div>
<p>Cost: <tt>O(1)</tt></p>
<p>A failure during job abort is implicitly repeatable</p>
<div>
<div>
<pre class="source">def isCommitJobRepeatable() :
return True
</pre></div></div>
</div>
<div class="section">
<h3><a name="v2_Job_Abort"></a>v2 Job Abort</h3>
<p>Delete all data under job attempt path.</p>
<div>
<div>
<pre class="source">def abortJob(fs, jobAttemptDir, dest):
fs.delete(jobAttemptDir, recursive=True)
</pre></div></div>
<p>Cost: <tt>O(1)</tt> for normal filesystems, <tt>O(files)</tt> for object stores.</p></div>
<div class="section">
<h3><a name="v2_Task_Recovery"></a>v2 Task Recovery</h3>
<p>As no data is written to the destination directory, a task can be cleaned up by deleting the task attempt directory.</p></div>
<div class="section">
<h3><a name="v2_Job_Recovery_Before_commitJob.28.29"></a>v2 Job Recovery Before <tt>commitJob()</tt></h3>
<p>Because the data has been renamed into the destination directory, all tasks recorded as having being committed have no recovery needed at all:</p>
<div>
<div>
<pre class="source">def recoverTask(tac):
</pre></div></div>
<p>All active and queued tasks are scheduled for execution.</p>
<p>There is a weakness here, the same one on a failure during <tt>commitTask()</tt>: it is only safe to repeat a task which failed during that commit operation if the name of all generated files are constant across all task attempts.</p>
<p>If the Job AM fails while a task attempt has been instructed to commit, and that commit is not recorded as having completed, the state of that in-progress task is unknown&#x2026;really it isn&#x2019;t be safe to recover the job at this point.</p></div>
<div class="section">
<h3><a name="v2_Job_Recovery_During_commitJob.28.29"></a>v2 Job Recovery During <tt>commitJob()</tt></h3>
<p>This is straightforward: <tt>commitJob()</tt> is re-invoked.</p></div></div>
<div class="section">
<h2><a name="How_MapReduce_uses_the_committer_in_a_task"></a>How MapReduce uses the committer in a task</h2>
<p>MapReduce runs each Mapper or Reducer in its own container; it gets its own process. Implicitly, this also means that each task gets its own instance of every filesystem.</p>
<p>The work is choreographed in <tt>org.apache.hadoop.mapred.Task</tt>, for which there are specific subclassing for the Map (<tt>MapTask</tt>) and reduce (<tt>ReduceTask</tt>). There are also cleanup tasks whose role is simpler: clean things up, and a Job setup task which runs <tt>OutputCommittre.setupJob</tt>. That is: even the job setup phase is run in a Task.</p>
<p>MapTask uses the committer to write the output of all the mappers into the filesystem, ready for the reducers. Each partition writes its data as a <tt>MapFile</tt>, which is actually two <tt>SequenceFile</tt> files in a directory,: the <tt>data</tt> file of all Key-Value output, and <tt>index</tt> which contains an index of some of the keys in the file.</p>
<p>This is all written to the local filesystem.</p>
<p>The <tt>ReduceTask</tt> does the final write to the destination filesystem.</p>
<p>Because the Map phase uses a committer to commits intermediate work, any plug in committer supplied to a process through any extension mechanism <i>must</i> work with the output generated by a mapper. The staging committers do only if unique filenames are disabled, but as they and the magic committers are only meant to be used for the final output of work, it is somewhat moot. What is important is to be able to use different committers for the map phase as the final reduce. This is implicit if separate committers are defined for different filesystems: the new committer can be defined for the final destination FS, while <tt>file://</tt> can retain the default <tt>FileOutputCommitter</tt>.</p>
<div class="section">
<h3><a name="Task_Setup"></a>Task Setup</h3>
<p><tt>Task.initialize()</tt>: read in the configuration, instantiate the <tt>JobContextImpl</tt> and <tt>TaskAttemptContextImpl</tt> instances bonded to the current job &amp; task.</p></div>
<div class="section">
<h3><a name="Task_Ccommit"></a>Task Ccommit</h3>
<p>After the work is completed, <tt>Task.done()</tt> is invoked. Which is essentially the following codepath:</p>
<div>
<div>
<pre class="source">if (committer.needsTaskCommit(taskContext)) {
// get permission to commit from AM
int retries = MAX_RETRIES;
while(true) {
try {
umbilical.commitPending(taskId, taskStatus);
break;
} catch(IOException ie) {
if (--retries == 0) {
// FAIL WITHOUT CALLING ABORT
System.exit(67);
}
}
}
// commit the work
try {
committer.commitTask(taskContext);
} catch (IOException iee) {
// failure: abort
try {
committer.abortTask(taskContext);
} catch (IOException ioe) {
LOG.warn(&quot;Failure cleaning up: &quot; +
StringUtils.stringifyException(ioe));
}
throw iee;
}
}
</pre></div></div>
<p>That is: if and only if there is data to write, the Task requests clearance to do so from the AM. This ensures that speculative work is not committed if another task has written it, <i>and that even non-speculative work is not committed if the task has lost contact with the AM</i>.</p>
<p>That is: task output is not committed if the Job&#x2019;s AM has failed or is in a separate network partition from the Task.</p>
<p>If permission is granted, the commit call is invoked. If this doesn&#x2019;t work, <tt>abortTask()</tt> is called; all failures there are logged and swallowed.</p>
<p>This method actually appears to be limited in that it does not guarantee that <tt>committer.abortTask()</tt> is always called: if the <tt>umbilical.commitPending()</tt> calls fail repeatedly and the Task process aborts, <tt>committer.abortTask()</tt> is not called. If this is due to a network partition, we can hope that the AM invokes <tt>committer.abortTask()</tt>, and that if it is an AM failure then a restarted AM can clean up its previous attempts. For the classic FileOutputCommitter, listing and deleting the previous attempt&#x2019;s data is straightforward. However, for S3 committers using Multipart Upload as the means of uploading uncommitted data, it is critical to ensure that pending uploads are always aborted. This can be done by</p>
<ul>
<li>Making sure that all task-side failure branches in <tt>Task.done()</tt> call <tt>committer.abortTask()</tt>.</li>
<li>Having job commit &amp; abort cleaning up all pending multipart writes to the same directory tree. That is: require that no other jobs are writing to the same tree, and so list all pending operations and cancel them.</li>
<li>Add a CLI command to verify that there are no commits under a path; returning a non-zero exit code if there are. This can be used in testing.</li>
<li>Add a CLI command to purge commits under a path. This can be used in workflows, if felt necessary, and in test runs to isolate the output of different tests.</li>
<li>Document a recommended purge-pending-commit timeout for the system</li>
<li>Add probes in the integration tests to verify that pending uploads are never present after a write.</li>
</ul></div>
<div class="section">
<h3><a name="How_MapReduce_uses_the_committer_in_the_Application_Master"></a>How MapReduce uses the committer in the Application Master</h3>
<p>The AM uses a committer to set up and commit the job. To support failure and recovery of the AM, <tt>OutputCommitter.isRecoverySupported()</tt> is used to declare whether all the output of successful tasks can be used in the final job, or the entire job needs to be reset and repeated. <tt>OutputCommitter.isCommitJobRepeatable()</tt> addresses the other question: can a committer recover from a failure of the commit process itself.</p>
<p>A committer is created in the Application Master, and handed to an instance of <tt>org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler</tt>, which then manages the job-level lifecycle.</p>
<p>If the MRv1 API is used, the committer is chosen from the value of <tt>&quot;mapred.output.committer.class&quot;</tt>; in the MRv2 API the output format is instantiated, then asked for a committer using a task and task attempt ID of 0. A committer is obtained from the output format via a call to <tt>Committer.getOutputCommitter(taskContext)</tt>, again using the task attempt context with the (job, 0, 0) task and attempt IDs. That is: even for the job committers, a task context is always passed in to the <tt>OutputFormat</tt> when requesting a committer. *It is critical for all implementations of <tt>OutputCommitter.abortTask()</tt> to be able to execute from the AM, rather than the container and host running the task. Furthermore, all information needed for the abort (paths, filesystem instances &amp;c) <i>must</i> be retrieved from the <tt>TaskAttemptContext</tt> passed to the method, rather than relying on fields initiated from the context passed to the constructor.</p>
<div class="section">
<h4><a name="AM:_Job_setup:_OutputCommitter.setupJob.28.29"></a>AM: Job setup: <tt>OutputCommitter.setupJob()</tt></h4>
<p>This is initiated in <tt>org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.StartTransition</tt>. It is queued for asynchronous execution in <tt>org.apache.hadoop.mapreduce.v2.app.MRAppMaster.startJobs()</tt>, which is invoked when the service is started. Thus: the job is set up when the AM is started.</p></div>
<div class="section">
<h4><a name="AM:_Job_Commit:_OutputCommitter.commitJob.28.29"></a>AM: Job Commit: <tt>OutputCommitter.commitJob()</tt></h4>
<p>In the &#x201c;default&#x201d;/cluster filesystem, the <tt>CommitterEventHandler</tt> uses data in the staging area defined in <tt>yarn.app.mapreduce.am.staging-dir</tt> (default <tt>/tmp/hadoop-yarn/staging/$user/.staging</tt>), in a subdirectory named from the job ID.</p>
<p>Three paths are built up for using the filesystem for tracking the state of the commit process, with a goal of making the commit operation recoverable, where supported.</p>
<table border="0" class="bodyTable">
<thead>
<tr class="a">
<th> name </th>
<th> role </th></tr>
</thead><tbody>
<tr class="b">
<td> <tt>COMMIT_STARTED</tt> </td>
<td> mark the commencement of the job commit </td></tr>
<tr class="a">
<td> <tt>COMMIT_SUCCESS</tt> </td>
<td> mark the successful completion of the job commit </td></tr>
<tr class="b">
<td> <tt>COMMIT_FAIL</tt> </td>
<td> mark the failure of the job commit </td></tr>
</tbody>
</table>
<p>These markers are used to manage job restart/failure of they happen during the job commit itself.</p>
<p>When an AM starts up, it looks in its staging area for these files. as a way to determine the previous state of the job. If there are no <tt>COMMIT_</tt> marker files, the job is considered not to have attempted to commit itself yet.</p>
<p>The presence of <tt>COMMIT_SUCCESS</tt> or <tt>COMMIT_FAIL</tt> are taken as evidence that the previous job completed successfully or unsuccessfully; the AM then completes with a success/failure error code, without attempting to rerun the job.</p>
<p>If <tt>COMMIT_STARTED</tt> exists but not either of the completion markers, then, if the committer declares that its job commit operation is repeatable (<tt>Committer.isCommitJobRepeatable(jobContext) == true</tt>), then an attempt is made to recommit the job, deleting the <tt>COMMIT_STARTED</tt> and commencing the commit process again.</p>
<p>These <tt>COMMIT_STARTED</tt> files are simply 0-byte files, but are created with the overwrite bit only set to true if the job commit is considered repeatable:</p>
<div>
<div>
<pre class="source">private void touchz(Path p, boolean overwrite) throws IOException {
fs.create(p, overwrite).close();
}
</pre></div></div>
<p>That is: the atomicity of the <tt>create(path, overwrite=false)</tt> on the cluster filesystem is used to guarantee that only one process will attempt to commit a specific job.</p>
<div>
<div>
<pre class="source">boolean commitJobIsRepeatable = committer.isCommitJobRepeatable(
event.getJobContext());
try {
touchz(startCommitFile, commitJobIsRepeatable);
waitForValidCommitWindow();
committer.commitJob(event.getJobContext());
touchz(endCommitSuccessFile, commitJobIsRepeatable);
} catch (Exception e) {
touchz(endCommitFailureFile, commitJobIsRepeatable);
}
</pre></div></div>
<p>The <tt>waitForValidCommitWindow()</tt> operation is important: it declares that the committer must not commit unless there has been communication with the YARN Resource Manager with in <tt>yarn.app.mapreduce.am.job.committer.commit-window</tt> milliseconds (default: 10,000). It does this by waiting until the next heartbeat it received. There&#x2019;s a possible bug here: if the interval is set too small the thread may permanently spin waiting a callback within the window. Ignoring that, this algorithm guarantees that</p>
<ol style="list-style-type: decimal">
<li>
<p>As only one call can create a file with <tt>overwrite=false</tt>, only one process&#x2019;s attempt to commit a non-repeatable job will proceed</p>
</li>
<li>
<p>Only a process with contact with the YARN within the configured window may commit a job.</p>
</li>
<li>
<p>If the AM is partitioned from the rest of the network, provided that its clock is monotonically increasing at the same rate as the rest of the cluster, then the rest of the cluster can be confident that <tt>yarn.app.mapreduce.am.ob.committer.commit-window</tt> milliseconds after the AM successfully heartbeated to the YARN RM, then the output of this job attempt will <i>never</i> be committed. This permits 1 job to run simultaneously, but helps ensure that only one of them will attempt to commit.</p>
</li>
<li>
<p>Provided YARN heartbeats are only sent to the AM which successfully created the <tt>COMMIT_STARTED</tt> file, it will initiate the commit operation.</p>
</li>
</ol>
<p>Possible issues with this algorithm:</p>
<ul>
<li>
<p>The <tt>COMMIT_STARTED</tt> file is created before waiting to get a heartbeat. It may be that this AM has lost contact with YARN, but doesn&#x2019;t know it yet. When the YARN liveness protocols eventually time out, the AM will correctly terminate, but as the <tt>COMMIT_STARTED</tt> file has been created at this point, no other launched AM will be able to commit.</p>
</li>
<li>
<p>If two committers attempt to create <tt>COMMIT_STARTED</tt> files on a no-repeatable commit, one will succeed, wait for a heartbeat then attempt a (possibly slow) commit. The second committer will fail, and will <i>immediately</i> create a <tt>COMMIT_FAILED</tt> file. As a result, the state of the staging area will imply that the commit has failed, when really it is in progress, and that only the second process failed.</p>
</li>
</ul>
<p>It would seem safer to address this through 1. Waiting for a heartbeat before creating the <tt>COMMIT_STARTED</tt> file. 1. Maybe: not creating the <tt>COMMIT_FAILED</tt> file if the failure happens when trying to create the <tt>COMMIT_STARTED</tt> file. That is: only a process which successfully created the <tt>COMMIT_STARTED</tt> file may indicate that a commit has failed.</p></div></div>
<div class="section">
<h3><a name="AM:_Cancelling_job_commit"></a>AM: Cancelling job commit</h3>
<p>The thread performing the commit is interrupted; the <tt>CommitterEventHandler</tt> awaits for it to finish. (set in <tt>yarn.app.mapreduce.am.job.committer.cancel-timeout</tt> as milliseconds).</p></div>
<div class="section">
<h3><a name="AM:_Task_Abort"></a>AM: Task Abort</h3>
<p>The AM may call the <tt>OutputCommitter.taskAbort()</tt> with a task attempt context, when handling the failure/loss of a container. That is: on container failure, the task abort operation is executed in the AM, using the AM&#x2019;s committer. This avoids the need to create a new container, and means that the &#x201c;best-effort&#x201d; task abort does cope with container failures.</p>
<p>A partition between the AM and the task container means that this AM-executed task abort may take place while a task in the partitioned container is still executing. Unless output writing operations will fail after the abort operation, the partitioned task may not become aware of the partition until it&#x2019;s own task commit sequence in <tt>Task.done()</tt>, when <tt>talkToAMTGetPermissionToCommit()</tt></p>
<h1>Requirements of an S3A Committer</h1>
<p>The design requirements of the S3A committer were</p>
<ol style="list-style-type: decimal">
<li>Support an eventually consistent S3 object store as a reliable direct destination of work through the S3A filesystem client.</li>
<li>Efficient: implies no rename, and a minimal amount of delay in the job driver&#x2019;s task and job commit phases,</li>
<li>Support task failure and speculation.</li>
<li>Can be used by existing code: Hadoop MapReduce, Spark, Hive.</li>
<li>Retrofittable to existing subclasses of FileOutputFormat and/or compatible with committers which expect a specific FileOutputFormat.</li>
<li>Clean up uncommitted data from all task attempts, all previous attempts of the job, and any previous incompleted jobs.</li>
<li>Security: not to permit privilege escalation from other users with write access to the same file system(s).</li>
</ol></div></div>
<div class="section">
<h2><a name="Features_of_S3_and_the_S3A_Client"></a>Features of S3 and the S3A Client</h2>
<p>A core problem is that <a href="../../../hadoop-project-dist/hadoop-common/filesystem/introduction.html">object stores are not filesystems</a>; how <tt>rename()</tt> has been emulated in the S3A client means that both the existing MR committer algorithms have significant performance problems.</p>
<ol style="list-style-type: decimal">
<li>Single-object renames are implemented as a copy and delete sequence.</li>
<li>COPY is atomic, but overwrites cannot be prevented.</li>
<li>[Obsolete] Amazon S3 is eventually consistent on listings, deletes and updates.</li>
<li>[Obsolete] Amazon S3 has create consistency, however, the negative response of a HEAD/GET performed on a path before an object was created can be cached, unintentionally creating a create inconsistency. The S3A client library does perform such a check, on <tt>create()</tt> and <tt>rename()</tt> to check the state of the destination path, and so, whether the operation is permitted.</li>
<li>Multi-object renames are sequential or parallel single object COPY+DELETE operations: non atomic, <tt>O(data)</tt> and, on failure, can leave the filesystem in an unknown state.</li>
<li>There is a PUT operation, capable of uploading 5GB of data in one HTTP request.</li>
<li>The PUT operation is atomic, but there is no PUT-no-overwrite option.</li>
<li>There is a multipart POST/PUT sequence for uploading larger amounts of data in a sequence of PUT requests.</li>
</ol>
<p>The Hadoop S3A Filesystem client supports PUT and multipart PUT for uploading data, with the <tt>S3ABlockOutputStream</tt> of HADOOP-13560 uploading written data as parts of a multipart PUT once the threshold set in the configuration parameter <tt>fs.s3a.multipart.size</tt> (default: 100MB).</p>
<p><a href="./s3guard.html">S3Guard</a> added an option of consistent view of the filesystem to all processes using the shared DynamoDB table as the authoritative store of metadata. The proposed algorithm was designed to work with such object stores without the need for any DynamoDB tables. Since AWS S3 became consistent in 2020, this means that they will work directly with the store.</p></div>
<div class="section">
<h2><a name="Related_work:_Spark.E2.80.99s_DirectOutputCommitter"></a>Related work: Spark&#x2019;s <tt>DirectOutputCommitter</tt></h2>
<p>One implementation to look at is the <a class="externalLink" href="https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala"><tt>DirectOutputCommitter</tt> of Spark 1.6</a>.</p>
<p>This implements a zero rename commit by subclassing the <tt>ParquetOutputCommitter</tt> and then</p>
<ol style="list-style-type: decimal">
<li>Returning the destination directory as the task working directory.</li>
<li>Subclassing all the task commit/abort operations to be no-ops.</li>
</ol>
<p>With the working directory as the destination directory, there is no need to move/rename the task output on a successful commit. However, it is flawed. There is no notion of &#x201c;committing&#x201d; or &#x201c;aborting&#x201d; a task, hence no ability to handle speculative execution or failures. This is why the committer was removed from Spark 2 <a class="externalLink" href="https://issues.apache.org/jira/browse/SPARK-10063">SPARK-10063</a></p>
<p>There is also the issue that work-in-progress data is visible; this may or may not be a problem.</p></div>
<div class="section">
<h2><a name="Related_work:_IBM.E2.80.99s_.E2.80.9CStocator.E2.80.9D_committer"></a>Related work: IBM&#x2019;s &#x201c;Stocator&#x201d; committer</h2>
<p>IBM&#x2019;s <a class="externalLink" href="https://github.com/SparkTC/stocator">Stocator</a> can transform indirect writes of V1/V2 committers into direct writes to the destination directory.</p>
<p>How does it do this? It&#x2019;s a special Hadoop <tt>FileSystem</tt> implementation which recognizes writes to <tt>_temporary</tt> paths and translate them to writes to the base directory. As well as translating the write operation, it also supports a <tt>getFileStatus()</tt> call on the original path, returning details on the file at the final destination. This allows for committing applications to verify the creation/existence/size of the written files (in contrast to the magic committer covered below).</p>
<p>The FS targets Openstack Swift, though other object stores are supportable through different backends.</p>
<p>This solution is innovative in that it appears to deliver the same semantics (and hence failure modes) as the Spark Direct OutputCommitter, but which does not need any change in either Spark <i>or</i> the Hadoop committers. In contrast, the committers proposed here combines changing the Hadoop MR committers for ease of pluggability, and offers a new committer exclusively for S3, one strongly dependent upon and tightly integrated with the S3A Filesystem.</p>
<p>The simplicity of the Stocator committer is something to appreciate.</p></div>
<div class="section">
<h2><a name="Background:_The_S3_multi-part_PUT_mechanism"></a>Background: The S3 multi-part PUT mechanism</h2>
<p>In the <a class="externalLink" href="http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html">S3 REST API</a>, multipart uploads allow clients to upload a series of &#x201c;Parts&#x201d; of a file, then commit the upload with a final call.</p>
<ol style="list-style-type: decimal">
<li>
<p>Caller initiates a multipart request, including the destination bucket, key and metadata.</p>
<div>
<div>
<pre class="source">POST bucket.s3.aws.com/path?uploads
</pre></div></div>
<p>An <tt>UploadId</tt> is returned</p>
</li>
<li>
<p>Caller uploads one or more parts.</p>
<div>
<div>
<pre class="source">PUT bucket.s3.aws.com/path?partNumber=PartNumber&amp;uploadId=UploadId
</pre></div></div>
<p>The part number is used to declare the ordering of the PUTs; they can be uploaded in parallel and out of order. All parts <i>excluding the final part</i> must be 5MB or larger. Every upload completes with an etag returned</p>
</li>
<li>
<p>Caller completes the operation</p>
<div>
<div>
<pre class="source">POST /ObjectName?uploadId=UploadId
&lt;CompleteMultipartUpload&gt;
&lt;Part&gt;&lt;PartNumber&gt;(number)&lt;PartNumber&gt;&lt;ETag&gt;(Tag)&lt;/ETag&gt;&lt;/Part&gt;
...
&lt;/CompleteMultipartUpload&gt;
</pre></div></div>
<p>This final call lists the etags of all uploaded parts and the actual ordering of the parts within the object.</p>
</li>
</ol>
<p>The completion operation is apparently <tt>O(1)</tt>; presumably the PUT requests have already uploaded the data to the server(s) which will eventually be serving up the data for the final path. All that is needed to complete the upload is to construct an object by linking together the files in the server&#x2019;s local filesystem and update an entry the index table of the object store.</p>
<p>In the S3A client, all PUT calls in the sequence and the final commit are initiated by the same process. <i>This does not have to be the case</i>. It is that fact, that a different process may perform different parts of the upload, which make this algorithm viable.</p></div>
<div class="section">
<h2><a name="The_Netflix_.E2.80.9CStaging.E2.80.9D_committer"></a>The Netflix &#x201c;Staging&#x201d; committer</h2>
<p>Ryan Blue, of Netflix, has submitted an alternate committer, one which has a number of appealing features</p>
<ul>
<li>Doesn&#x2019;t have any requirements of the destination object store,</li>
<li>Known to work.</li>
</ul>
<p>The final point is not to be underestimated, es not even a need for a consistency layer. * Overall a simpler design. Especially given the need to be resilient to the various failure modes which may arise.</p>
<p>The committer writes task outputs to a temporary directory on the local FS. Task outputs are directed to the local FS by <tt>getTaskAttemptPath</tt> and <tt>getWorkPath</tt>. On task commit, the committer enumerates files in the task attempt directory (ignoring hidden files). Each file is uploaded to S3 using the <a class="externalLink" href="http://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html">multi-part upload API</a>,</p>
<p>The information needed to commit the upload is saved into HDFS and committed via that protocol: when the job commits, the pending uploads of the successful tasks are all committed.</p>
<div class="section">
<h3><a name="Commit_logic"></a>Commit logic</h3>
<p>The core algorithm is as follows:</p>
<ol style="list-style-type: decimal">
<li>The destination directory for output (e.g. <tt>FileOutputFormat</tt> and subclasses) is a local <tt>file://</tt> reference.</li>
<li>Task commit initiates the multipart PUT to the destination object store.</li>
<li>A list of every pending PUT for task is persisted to a single file within a consistent, cluster-wide filesystem. For Netflix, that is HDFS.</li>
<li>The Standard <tt>FileOutputCommitter</tt> (algorithm 1) is used to manage the commit/abort of these files. That is: it copies only those lists of files to commit from successful tasks into a (transient) job commit directory.</li>
<li>The S3 job committer reads the pending file list for every task committed in HDFS, and completes those put requests.</li>
</ol>
<p>By using <tt>FileOutputCommmitter</tt> to manage the propagation of the lists of files to commit, the existing commit algorithm implicitly becomes that defining which files will be committed at the end of the job.</p>
<p>The Netflix contribution has Hadoop <tt>OutputCommitter</tt> implementations for S3.</p>
<p>There are 3 main classes: * <tt>S3MultipartOutputCommitter</tt> is a base committer class that handles commit logic. This should not be used directly. * <tt>S3DirectoryOutputCommitter</tt> for writing unpartitioned data to S3 with conflict resolution. * <tt>S3PartitionedOutputCommitter</tt> for writing partitioned data to S3 with conflict resolution.</p>
<p>Callers should use <tt>S3DirectoryOutputCommitter</tt> for single-directory outputs, or <tt>S3PartitionedOutputCommitter</tt> for partitioned data.</p>
<p>These S3 committers work by writing task outputs to a temporary directory on the local FS. Task outputs are directed to the local FS by <tt>getTaskAttemptPath</tt> and <tt>getWorkPath</tt>.</p></div>
<div class="section">
<h3><a name="Conflict_Resolution"></a>Conflict Resolution</h3>
<p>The single-directory and partitioned committers handle conflict resolution by checking whether target paths exist in S3 before uploading any data. There are three conflict resolution modes, controlled by setting <tt>fs.s3a.committer.staging.conflict-mode</tt>:</p>
<ul>
<li><tt>fail</tt>: Fail a task if an output directory or partition already exists. (Default)</li>
<li><tt>append</tt>: Upload data files without checking whether directories or partitions already exist.</li>
<li><tt>replace</tt>: If an output directory exists, delete it so the new data replaces the current content.</li>
</ul>
<p>The partitioned committer enforces the conflict mode when a conflict is detected with output data, not before the job runs. Conflict resolution differs from an output mode because it does not enforce the mode when there is no conflict. For example, overwriting a partition should remove all sub-partitions and data it contains, whether or not new output is created. Conflict resolution will only replace partitions that have output data.</p>
<p>When the conflict mode is <tt>replace</tt>, conflicting directories are removed during job commit. Data is only deleted if all tasks have completed successfully.</p>
<p>A UUID that identifies a write is added to filenames that are uploaded to S3. This allows rolling back data from jobs that fail during job commit (see failure cases below) and avoids file-level conflicts when appending data to existing directories.</p>
<p><i>Note</i> the checks for existence are made via <tt>S3AFileSystem.getFileStatus()</tt> requests of the destination paths. Unless the view of the S3 store is consistent, it may be that a newly-deleted object is still discovered in the probe, so a commit fail, even when there is no longer any actual conflict.</p></div>
<div class="section">
<h3><a name="Performance"></a>Performance</h3>
<p>Compared to the previous proposal, henceforth the &#x201c;magic&#x201d; committer, this committer, the &#x201c;staging committer&#x201d;, adds the extra overhead of uploading each file at the end of every task. This is an <tt>O(data)</tt> operation; it can be parallelized, but is bounded by the bandwidth from compute node to S3, as well as the write/IOP capacity of the destination shard of S3. If many tasks complete at or near the same time, there may be a peak of bandwidth load slowing down the upload.</p>
<p>Time to commit will be the same, and, given the Netflix committer has already implemented the parallelization logic here, a time of <tt>O(files/threads)</tt>.</p></div>
<div class="section">
<h3><a name="Resilience"></a>Resilience</h3>
<p>There&#x2019;s already a lot of code in the task and job commits to handle failure.</p>
<p>Any failure in a commit triggers a best-effort abort/revert of the commit actions for a task/job.</p>
<p>Task commits delegate to the <tt>FileOutputCommitter</tt> to ensure that only one task&#x2019;s output reaches the job commit.</p>
<p>Similarly, if a task is aborted, temporary output on the local FS is removed.</p>
<p>If a task dies while the committer is running, it is possible for data to be left on the local FS or as unfinished parts in S3. Unfinished upload parts in S3 are not visible to table readers and are cleaned up following the rules in the target bucket&#x2019;s life-cycle policy.</p>
<p>Failures during job commit are handled by deleting any files that have already been completed and aborting the remaining uploads. Because uploads are completed individually, the files that are deleted were visible to readers.</p>
<p>If the process dies while the job committer is running, there are two possible failures:</p>
<ol style="list-style-type: decimal">
<li>Some directories that would be replaced have been deleted, but no new data is visible.</li>
<li>Some new data is visible but is not complete, and all replaced directories have been removed. Only complete files are visible.</li>
</ol>
<p>If the process dies during job commit, cleaning up is a manual process. File names include a UUID for each write so that files can be identified and removed.</p>
<p><b>Failure during task execution</b></p>
<p>All data is written to local temporary files; these need to be cleaned up.</p>
<p>The job must ensure that the local (pending) data is purged.</p>
<p><b>Failure during task commit</b></p>
<p>A process failure during the upload process will result in the list of pending multipart PUTs to <i>not</i> be persisted to the cluster filesystem. This window is smaller than the entire task execution, but still potentially significant, at least for large uploads.</p>
<p>Per-file persistence, or incremental overwrites of the upload list may reduce the problems here, but there would still be a small risk of an outstanding multipart upload not being recorded</p>
<p><b>Explicit Task abort before task commit</b></p>
<p>Task will delete all local data; no uploads will be initiated.</p>
<p><b>Failure to communicate with S3 during data upload</b></p>
<p>If an upload fails, tasks will:</p>
<ul>
<li>Retry using the retry policies implemented in the S3AFileSystem classes and the AWS libraries.</li>
<li>Eventually: attempt to abort outstanding multipart uploads already initiated.</li>
<li>Remove temporary files on the local FS.</li>
</ul>
<p><b>Explicit Job Abort</b></p>
<p>All in-progress tasks are aborted and cleaned up. The pending commit data of all completed tasks can be loaded, outstanding multipart PUT requests aborted.</p>
<p>This is done by</p>
<ol style="list-style-type: decimal">
<li>Listing all local files, with a best effort read attempt followed by an abort of all successfully read files.</li>
<li>List and abort all pending multipart uploads.</li>
</ol>
<p>Because of action #2, action #1 is superfluous. It is retained so as to leave open the option of making action #2 a configurable option -which would be required to handle the use case of &gt;1 partitioned commit running simultaneously/</p>
<p><b>Job Driver failure before Job Commit</b></p>
<p>Because the local data is managed with the v1 commit algorithm, the second attempt of the job will recover all the outstanding commit data of the first attempt; those tasks will not be rerun.</p>
<p>This also ensures that on a job abort, the individual tasks&#x2019; .pendingset files can be read and used to initiate the abort of those uploads. That is: a recovered job can clean up the pending writes of the previous job</p>
<p>If the query engine does not support multiple job attempts, then the pending commit data will not be recovered; an explicit abort operation will need to be initiated (we will add a CLI command for this), or the S3 bucket must be configured to automatically delete the pending request.</p>
<p><b>Job Driver failure during Job Commit</b></p>
<p>Those uploads already executed by a failed job commit will persist; those yet to execute will remain outstanding.</p>
<p>The committer currently declares itself as non-recoverable, but that may not actually hold, as the recovery process could be one of:</p>
<ol style="list-style-type: decimal">
<li>Enumerate all job commits from the .pendingset files (<i>:= Commits</i>).</li>
<li>List all outstanding uploads under the destination path (<i>:= Outstandings</i>)..</li>
<li>List all written files (for better consistency, via a GET call on the known filenames)</li>
<li>Identify all files which are yet to be completed (<i>Commits - Written</i>)/</li>
<li>Verify that the set of pending uploads matches (<i>Outstanding = (Commits - Written)</i>)</li>
</ol>
<p>The main problem here is the progress of the job-commit-time conflict resolution process: how to determine if it completed, as the only way to be confident that all files in the destination directory are to be retained is by knowing that the pre-commit phase completed. This could be implicitly determined based on the rule &#x201c;no uploads are committed until precommit is completed&#x201d;. If it can be determined that 1+ upload has completed, then it could be inferred that precommit had completed and so the job could be repeated.</p>
<p>This is dangerous territory to delve into. For now, the committer declares itself as unrecoverable.</p>
<p><b>Entire application failure before any task commit</b></p>
<p>Data is left on local systems, in the temporary directories. This may not be cleaned up.</p>
<p><b>Entire application failure after one or more task commits, before job commit</b></p>
<ul>
<li>A multipart PUT request will be outstanding for every pending write.</li>
<li>A temporary directory in HDFS will list all known pending requests.</li>
</ul>
<p><b>Job complete/abort after &gt;1 task failure</b></p>
<ol style="list-style-type: decimal">
<li>All pending put data listed in the job completion directory needs to be loaded and then cancelled.</li>
<li>Any other pending writes to the dest dir need to be enumerated and aborted. This catches the situation of a task failure before the output is written.</li>
<li>All pending data in local dirs need to be deleted.</li>
</ol>
<p>Issue: what about the destination directory: overwrite or not? It could well depend upon the merge policy.</p>
<div class="section">
<h4><a name="Overall_Resilience"></a>Overall Resilience</h4>
<ol style="list-style-type: decimal">
<li>The only time that incomplete work will appear in the destination directory is if the job commit operation fails partway through.</li>
<li>There&#x2019;s a risk of leakage of local filesystem data; this will need to be managed in the response to a task failure.</li>
<li>There&#x2019;s a risk of uncommitted multipart PUT operations remaining outstanding, operations which will run up bills until cancelled. (as indeed, so does the Magic Committer).</li>
</ol>
<p>For cleaning up PUT commits, as well as scheduled GC of uncommitted writes, we may want to consider having job setup list and cancel all pending commits to the destination directory, on the assumption that these are from a previous incomplete operation.</p>
<p>We should add a &#x201c;commit&#x201d; command to the S3guard CLI to probe for, list and abort pending requests under a path, e.g. <tt>--has-pending &lt;path&gt;</tt>, <tt>--list-pending &lt;path&gt;</tt>, <tt>--abort-pending &lt;path&gt;</tt>.</p></div></div></div>
<div class="section">
<h2><a name="The_.E2.80.9CMagic.E2.80.9D_Committer"></a>The &#x201c;Magic&#x201d; Committer</h2>
<p>Development on this committer began before Netflix donated their committer.</p>
<p>By making changes to the <tt>S3AFileSystem</tt> and the <tt>S3ABlockOutputStream</tt>, this committer manages to postpone the completion of writes of all files written to special (&#x201c;magic&#x201d;) directories; the final destination of the write being altered to that of the final job destination. When the job is committed, the pending writes are instantiated.</p>
<p>With the addition of the Netflix Staging committer, the actual committer code now shares common formats for the persistent metadata and shared routines for parallel committing of work, including all the error handling based on the Netflix experience.</p>
<p>It differs in that it directly streams data to S3 (there is no staging), and it also stores the lists of pending commits in S3 too. It requires a consistent S3 store.</p>
<div class="section">
<h3><a name="Core_concept:_A_new.2Fmodified_output_stream_for_delayed_PUT_commits"></a>Core concept: A new/modified output stream for delayed PUT commits</h3>
<p>This algorithm uses a modified <tt>S3ABlockOutputStream</tt> Output stream, which, rather than commit any active multipart upload in the final <tt>close()</tt> operation, it instead saves enough information into the S3 repository for an independent process to be able to complete or abort the upload.</p>
<p>Originally, in <tt>OutputStream.close()</tt>, it chose whether to perform a single PUT or to complete an ongoing multipart write.</p>
<p>If a multipart PUT is in progress, then the stream waits for the ongoing uploads to complete (including any final block submitted), and then builds and PUTs the final multipart commit operation. The list of parts (and their ordering) has been built up during the opt</p>
<p>In contrast, when writing to a delayed-commit file:</p>
<ol style="list-style-type: decimal">
<li>
<p>A multipart write MUST always be initiated, even for small writes. This write MAY be initiated during the creation of the stream.</p>
</li>
<li>
<p>Instead of committing the write in the <tt>close()</tt> call, perform a PUT to a path in the S3A repository with all the information needed to commit the operation. That is: the final path, the multipart upload ID, and the ordered list of etags for the uploaded parts.</p>
</li>
</ol>
<p>Recognising when a file is &#x201c;special&#x201d; is problematic; the normal <tt>create(Path, Boolean)</tt> call must recognize when the file being created is to be a delayed-commit file, so returning the special new stream.</p>
<p>This is done with a &#x201c;magic&#x201d; temporary directory name, <tt>__magic</tt>, to indicate that all files created under this path are not to be completed during the stream write process. Directories created under the path will still be created &#x2014;this allows job- and task-specific directories to be created for individual job and task attempts.</p>
<p>For example, the pattern <tt>__magic/${jobID}/${taskId}</tt> could be used to store pending commits to the final directory for that specific task. If that task is committed, all pending commit files stored in that path will be loaded and used to commit the final uploads.</p>
<p>Consider a job with the final directory <tt>/results/latest</tt></p>
<p>The intermediate directory for the task 01 attempt 01 of job <tt>job_400_1</tt> would be</p>
<div>
<div>
<pre class="source">/results/latest/__magic/job_400_1/_task_01_01
</pre></div></div>
<p>This would be returned as the temp directory.</p>
<p>When a client attempted to create the file <tt>/results/latest/__magic/job_400_1/task_01_01/latest.orc.lzo</tt> , the S3A FS would initiate a multipart request with the final destination of <tt>/results/latest/latest.orc.lzo</tt>.</p>
<p>As data was written to the output stream, it would be incrementally uploaded as individual multipart PUT operations</p>
<p>On <tt>close()</tt>, summary data would be written to the file <tt>/results/latest/__magic/job400_1/task_01_01/latest.orc.lzo.pending</tt>. This would contain the upload ID and all the parts and etags of uploaded data.</p>
<p>A marker file is also created, so that code which verifies that a newly created file exists does not fail. 1. These marker files are zero bytes long. 1. They declare the full length of the final file in the HTTP header <tt>x-hadoop-s3a-magic-data-length</tt>. 1. A call to <tt>getXAttr(&quot;header.x-hadoop-s3a-magic-data-length&quot;)</tt> will return a string containing the number of bytes in the data uploaded.</p>
<p>This is needed so that the Spark write-tracking code can report how much data has been created.</p>
<div class="section">
<h4><a name="Task_commit"></a>Task commit</h4>
<p>The information needed to commit a task is moved from the task attempt to the job attempt.</p>
<ol style="list-style-type: decimal">
<li>The task commit operation lists all <tt>.pending</tt> files in its attempt directory.</li>
<li>The contents are loaded into a list of single pending uploads.</li>
<li>These are merged into to a single <tt>Pendingset</tt> structure.</li>
<li>Which is saved to a <tt>.pendingset</tt> file in the job attempt directory.</li>
<li>Finally, the task attempt directory is deleted. In the example, this would be to <tt>/results/latest/__magic/job400_1/task_01_01.pendingset</tt>;</li>
</ol>
<p>A failure to load any of the single pending upload files (i.e. the file could not load or was considered invalid, the task is considered to have failed. All successfully loaded pending commits will be aborted, then the failure reported.</p>
<p>Similarly, a failure to save the <tt>.pendingset</tt> file will trigger an abort of all its pending uploads.</p></div>
<div class="section">
<h4><a name="Job_Commit"></a>Job Commit</h4>
<p>The job committer loads all <tt>.pendingset</tt> files in its job attempt directory.</p>
<p>A failure to load any of these files is considered a job failure; all pendingsets which could be loaded will be aborted.</p>
<p>If all pendingsets were loaded, then every pending commit in the job will be committed. If any one of these commits failed, then all successful commits will be reverted by deleting the destination file.</p></div>
<div class="section">
<h4><a name="Supporting_directory_trees"></a>Supporting directory trees</h4>
<p>To allow tasks to generate data in subdirectories, a special filename <tt>__base</tt> will be used to provide an extra cue as to the final path. When mapping an output path <tt>/results/latest/__magic/job_400/task_01_01/__base/2017/2017-01-01.orc.lzo.pending</tt> to a final destination path, the path will become <tt>/results/latest/2017/2017-01-01.orc.lzo</tt>. That is: all directories between <tt>__magic</tt> and <tt>__base</tt> inclusive will be ignored.</p>
<p><b>Issues</b></p>
<p>Q. What if there are some non-<tt>.pending</tt> files in the task attempt directory?</p>
<p>A. This can only happen if the magic committer is being used in an S3A client which does not have the &#x201c;magic path&#x201d; feature enabled. This will be checked for during job and task committer initialization.</p></div></div>
<div class="section">
<h3><a name="Failure_cases"></a>Failure cases</h3>
<div class="section">
<h4><a name="Network_Partitioning"></a>Network Partitioning</h4>
<p>The job/task commit protocol is expected to handle this with the task only committing work when the job driver tells it to. A network partition should trigger the task committer&#x2019;s cancellation of the work (this is a protocol above the committers).</p></div>
<div class="section">
<h4><a name="Job_Driver_failure"></a>Job Driver failure</h4>
<p>The job will be restarted. When it completes it will delete all outstanding requests to the destination directory which it has not committed itself.</p></div>
<div class="section">
<h4><a name="Task_failure"></a>Task failure</h4>
<p>The task will be restarted. Pending work of the task will not be committed; when the job driver cleans up it will cancel pending writes under the directory.</p></div>
<div class="section">
<h4><a name="Multiple_jobs_targeting_the_same_destination_directory"></a>Multiple jobs targeting the same destination directory</h4>
<p>This leaves things in an indeterminate state.</p></div>
<div class="section">
<h4><a name="Failure_during_task_commit"></a>Failure during task commit</h4>
<p>Pending uploads will remain, but no changes will be visible.</p>
<p>If the <tt>.pendingset</tt> file has been saved to the job attempt directory, the task has effectively committed, it has just failed to report to the controller. This will cause complications during job commit, as there may be two task PendingSet committing the same files, or committing files with</p>
<p><i>Proposed</i>: track task ID in pendingsets, recognise duplicates on load and then respond by cancelling one set and committing the other. (or fail?)</p></div>
<div class="section">
<h4><a name="Failure_during_job_commit"></a>Failure during job commit</h4>
<p>The destination will be left in an unknown state.</p></div>
<div class="section">
<h4><a name="Failure_during_task.2Fjob_abort"></a>Failure during task/job abort</h4>
<p>Failures in the abort process are not well handled in either the committers or indeed in the applications which use these committers. If an abort operation fails, what can be done?</p>
<p>While somewhat hypothetical for the use case of a task being aborted due to the protocol (e.g. speculative jobs being aborted), the abort task/abort job calls may be made as part of the exception handling logic on a failure to commit. As such, the caller may assume that the abort does not fail: if it does, the newly thrown exception may hide the original problem.</p>
<p>Two options present themselves</p>
<ol style="list-style-type: decimal">
<li>Catch, log and swallow failures in the <tt>abort()</tt></li>
<li>Throw the exceptions, and expect the callers to handle them: review, fix and test that code as appropriate.</li>
</ol>
<p>Fixing the calling code does seem to be the best strategy, as it allows the failure to be explicitly handled in the commit protocol, rather than hidden in the committer.::OpenFile</p></div>
<div class="section">
<h4><a name="Preemption"></a>Preemption</h4>
<p>Preemption is the explicit termination of work at the behest of the cluster scheduler. It&#x2019;s a failure, but a special one: pre-empted tasks must not be counted as a failure in any code which only allows a limited number of trackers, and the Job driver can assume that the task was successfully terminated.</p>
<p>Job drivers themselves may be preempted.</p></div>
<div class="section">
<h4><a name="Cleaning_up_after_complete_job_failure"></a>Cleaning up after complete job failure</h4>
<p>One failure case is that the entire execution framework failed; a new process must identify outstanding jobs with pending work, and abort them, then delete the appropriate <tt>__magic</tt> directories.</p>
<p>This can be done either by scanning the directory tree for <tt>__magic</tt> directories and scanning underneath them, or by using the <tt>listMultipartUploads()</tt> call to list multipart uploads under a path, then cancel them. The most efficient solution may be to use <tt>listMultipartUploads</tt> to identify all outstanding request, and use that to identify which requests to cancel, and where to scan for <tt>__magic</tt> directories. This strategy should address scalability problems when working with repositories with many millions of objects &#x2014;rather than list all keys searching for those with <tt>/__magic/**/*.pending</tt> in their name, work backwards from the active uploads to the directories with the data.</p>
<p>We may also want to consider having a cleanup operation in the S3 CLI to do the full tree scan and purge of pending items; give some statistics on what was found. This will keep costs down and help us identify problems related to cleanup.</p></div></div>
<div class="section">
<h3><a name="Performance"></a>Performance</h3>
<p>The time to upload is that of today&#x2019;s block upload (<tt>s3a.fast.upload=true</tt>) output stream; ongoing through the write, and in the <tt>close()</tt> operation, a delay to upload any pending data and await all outstanding uploads to complete. There wouldn&#x2019;t be any overhead of the final completion request. If no data had yet been uploaded, the <tt>close()</tt> time would be that of the initiate multipart request and the final put. This could perhaps be simplified by always requesting a multipart ID on stream creation.</p>
<p>The time to commit each task is <tt>O(files)</tt>: all <tt>.pending</tt> files in and under the task attempt directory will be listed, their contents read and then an aggregate <tt>.pendingset</tt> file PUT to the job attempt directory. The <tt>.pending</tt> files are then deleted.</p>
<p>The time to commit a job will be <tt>O(files/threads)</tt></p>
<p>Every <tt>.pendingset</tt> file in the job attempt directory must be loaded, and a PUT request issued for every incomplete upload listed in the files.</p>
<p>[Obsolete] Note that it is the bulk listing of all children which is where full consistency is required. If instead, the list of files to commit could be returned from tasks to the job committer, as the Spark commit protocol allows, it would be possible to commit data to an inconsistent object store.</p></div>
<div class="section">
<h3><a name="Cost"></a>Cost</h3>
<p>Uncommitted data in an incomplete multipart upload is billed at the storage cost of the S3 bucket. To keep costs down, outstanding data from failed jobs must be deleted. This can be done through S3 bucket lifecycle policies, or some command tools which we would need to write.</p></div>
<div class="section">
<h3><a name="Limitations_of_this_algorithm"></a>Limitations of this algorithm</h3>
<ol style="list-style-type: decimal">
<li>
<p>Files will not be visible after the <tt>close()</tt> call, as they will not exist. Any code which expected pending-commit files to be visible will fail.</p>
</li>
<li>
<p>Failures of tasks and jobs will leave outstanding multipart uploads. These will need to be garbage collected. S3 now supports automated cleanup; S3A has the option to do it on startup, and we plan for the <tt>hadoop s3</tt> command to allow callers to explicitly do it. If tasks were to explicitly write the upload ID of writes as a write commenced, cleanup by the job committer may be possible.</p>
</li>
<li>
<p>The time to write very small files may be higher than that of PUT and COPY. We are ignoring this problem as not relevant in production; any attempt at optimizing small file operations will only complicate development, maintenance and testing.</p>
</li>
<li>
<p>The files containing temporary information could be mistaken for actual data.</p>
</li>
<li>
<p>It could potentially be harder to diagnose what is causing problems. Lots of logging can help, especially with debug-level listing of the directory structure of the temporary directories.</p>
</li>
<li>
<p>To reliably list all PUT requests outstanding, we need list consistency In the absence of a means to reliably identify when an S3 endpoint is consistent, people may still use eventually consistent stores, with the consequent loss of data.</p>
</li>
<li>
<p>If there is more than one job simultaneously writing to the same destination directories, the output may get confused. This appears to hold today with the current commit algorithms.</p>
</li>
<li>
<p>It is possible to create more than one client writing to the same destination file within the same S3A client/task, either sequentially or in parallel.</p>
</li>
<li>
<p>[Obsolete] Even with a consistent metadata store, if a job overwrites existing files, then old data may still be visible to clients reading the data, until the update has propagated to all replicas of the data.</p>
</li>
<li>
<p>If the operation is attempting to completely overwrite the contents of a directory, then it is not going to work: the existing data will not be cleaned up. A cleanup operation would need to be included in the job commit, deleting all files in the destination directory which where not being overwritten.</p>
</li>
<li>
<p>It requires a path element, such as <tt>__magic</tt> which cannot be used for any purpose other than for the storage of pending commit data.</p>
</li>
<li>
<p>Unless extra code is added to every FS operation, it will still be possible to manipulate files under the <tt>__magic</tt> tree. That&#x2019;s not bad, just potentially confusing.</p>
</li>
<li>
<p>As written data is not materialized until the commit, it will not be possible for any process to read or manipulated a file which it has just created.</p>
</li>
</ol></div>
<div class="section">
<h3><a name="Changes_to_S3ABlockOutputStream"></a>Changes to <tt>S3ABlockOutputStream</tt></h3>
<p>To avoid having to copy and paste the <tt>S3ABlockOutputStream</tt> it has been modified to be constructed with a <tt>PutTracker</tt> class to managed the immediate/delayed completion of uploads. It will be called at appropriate points.</p>
<ul>
<li>Initialization, returning a marker to indicate whether or not multipart upload is commence immediately.</li>
<li>Multipart PUT init.</li>
<li>Single put init (not used in this algorithm, but useful for completeness).</li>
<li>Block upload init, failure and completion (from the relevant thread).</li>
<li><tt>close()</tt> entered; all blocks completed &#x2014;returning a marker to indicate whether any outstanding multipart should be committed.</li>
<li>Multipart abort in <tt>abort()</tt> call (maybe: move core logic elsewhere).</li>
</ul>
<p>The base implementation would do nothing except declare that the MPU must be executed in the <tt>close()</tt> call.</p>
<p>The S3A Committer version, would 1. Always initiate a during initialization 1. In <tt>close()</tt> operation save all the data required to commit later.</p></div></div>
<div class="section">
<h2><a name="Integrating_the_Committers_with_Hadoop_MapReduce"></a>Integrating the Committers with Hadoop MapReduce</h2>
<p>In order to support the ubiquitous <tt>FileOutputFormat</tt> and subclasses, S3A Committers will need somehow be accepted as a valid committer by the class, a class which explicitly expects the output committer to be <tt>FileOutputCommitter</tt></p>
<div>
<div>
<pre class="source">public Path getDefaultWorkFile(TaskAttemptContext context,
String extension) throws IOException{
PathOutputCommitter committer =
(PathOutputCommitter) getOutputCommitter(context);
return new Path(committer.getWorkPath(), getUniqueFile(context,
getOutputName(context), extension));
}
</pre></div></div>
<p>Here are some options which have been considered, explored and discarded</p>
<ol style="list-style-type: decimal">
<li>
<p>Adding more of a factory mechanism to create <tt>FileOutputCommitter</tt> instances; subclass this for S3A output and return it. The complexity of <tt>FileOutputCommitter</tt> and of supporting more dynamic construction makes this dangerous from an implementation and maintenance perspective.</p>
</li>
<li>
<p>Add a new commit algorithm &#x201c;3&#x201d;, which actually reads in the configured classname of a committer which it then instantiates and then relays the commit operations, passing in context information. Ths new committer interface would add methods for methods and attributes. This is viable, but does still change the existing Committer code in a way which may be high-maintenance.</p>
</li>
<li>
<p>Allow the <tt>FileOutputFormat</tt> class to take any task/job context committer which implemented the <tt>getWorkPath()</tt> method &#x2014;that being the sole specific feature which it needs from the <tt>FileOutputCommitter</tt>.</p>
</li>
</ol>
<p>Option 3, make <tt>FileOutputFormat</tt> support more generic committers, is the current design. It relies on the fact that the sole specific method of <tt>FileOutputCommitter</tt> which <tt>FileOutputFormat</tt> uses is <tt>getWorkPath()</tt>.</p>
<p>This can be pulled up into a new abstract class, <tt>PathOutputCommitter</tt>, which <tt>FileOutputCommitter</tt> and <tt>S3ACommitter</tt> can implement:</p>
<div>
<div>
<pre class="source">public abstract class PathOutputCommitter extends OutputCommitter {
/**
* Get the directory that the task should write results into.
* @return the work directory
*/
public abstract Path getWorkPath() throws IOException;
}
</pre></div></div>
<p>The sole change needed for <tt>FileOutputFormat</tt> is to change what it casts the context committer to:</p>
<div>
<div>
<pre class="source">PathOutputCommitter committer =
(PathOutputCommitter) getOutputCommitter(context);
</pre></div></div>
<p>Provided that <tt>getWorkPath()</tt> remains the sole method which <tt>FileOutputFormat</tt> uses, these changes will allow an S3A committer to replace the <tt>FileOutputCommitter</tt>, with minimal changes to the codebase.</p>
<p>Update: There is a cost to this: MRv1 API support is lost.</p>
<div class="section">
<h3><a name="MRv1_support_via_org.apache.hadoop.mapred.FileOutputFormat"></a>MRv1 support via <tt>org.apache.hadoop.mapred.FileOutputFormat</tt></h3>
<p>A price of not subclassing <tt>FileOutputCommitter</tt> is that the code used to wrap and relay the MRv1 API calls protocol to the <tt>FileOutputCommitter</tt> will not work: the new committer will not be picked up.</p>
<p>This is visible in Spark, where the V1 API is exported from the <tt>RDD</tt> class (<tt>RDD.saveAsHadoopFile()</tt>)). The successor code, <tt>PairRDDFunctions.saveAsNewAPIHadoopFile()</tt> does work: *To get high performance commits in Object Stores, the MRv2 commit protocol must be used, which means: the V2 classes.</p>
<div class="section">
<h4><a name="Resolved_issues"></a>Resolved issues</h4>
<p><b>Magic Committer: Name of directory</b></p>
<p>The design proposes the name <tt>__magic</tt> for the directory. HDFS and the various scanning routines always treat files and directories starting with <tt>_</tt> as temporary/excluded data.</p>
<p><b>Magic Committer: Subdirectories</b></p>
<p>It is legal to create subdirectories in a task work directory, which will then be moved into the destination directory, retaining that directory tree.</p>
<p>That is, a if the task working dir is <tt>dest/__magic/app1/task1/</tt>, all files under <tt>dest/__magic/app1/task1/part-0000/</tt> must end up under the path <tt>dest/part-0000/</tt>.</p>
<p>This behavior is relied upon for the writing of intermediate map data in an MR job.</p>
<p>This means it is not simply enough to strip off all elements of under <tt>__magic</tt>, it is critical to determine the base path.</p>
<p>Proposed: use the special name <tt>__base</tt> as a marker of the base element for committing. Under task attempts a <tt>__base</tt> dir is created and turned into the working dir. All files created under this path will be committed to the destination with a path relative to the base dir.</p>
<p>More formally: the last parent element of a path which is <tt>__base</tt> sets the base for relative paths created underneath it.</p></div></div></div>
<div class="section">
<h2><a name="Testing"></a>Testing</h2>
<p>The committers can only be tested against an S3-compatible object store.</p>
<p>The committers have some unit tests, and integration tests based on the protocol integration test lifted from <tt>org.apache.hadoop.mapreduce.lib.output.TestFileOutputCommitter</tt> to test various state transitions of the commit mechanism has been extended to support the variants of the staging committer.</p>
<p>There is an abstract integration test, <tt>AbstractITCommitMRJob</tt> which creates a MiniYARN cluster bonded to a MiniHDFS cluster, then submits a simple MR job using the relevant committer. This verifies that the committer actually works, rather than just &#x201c;appears to follow the protocol&#x201d;</p>
<p>One feature added during this testing is that the <tt>_SUCCESS</tt> marker file saved is no-longer a 0-byte file, it is a JSON manifest file, as implemented in <tt>org.apache.hadoop.fs.s3a.commit.files.SuccessData</tt>. This file includes the committer used, the hostname performing the commit, timestamp data and a list of paths committed.</p>
<div>
<div>
<pre class="source">SuccessData{
committer='PartitionedStagingCommitter',
hostname='devbox.local',
description='Task committer attempt_1493832493956_0001_m_000000_0',
date='Wed May 03 18:28:41 BST 2017',
filenames=[/test/testMRJob/part-m-00000, /test/testMRJob/part-m-00002, /test/testMRJob/part-m-00001]
}
</pre></div></div>
<p>This was useful a means of verifying that the correct committer had in fact been invoked in those forked processes: a 0-byte <tt>_SUCCESS</tt> marker implied the classic <tt>FileOutputCommitter</tt> had been used; if it could be read then it provides some details on the commit operation which are then used in assertions in the test suite.</p>
<p>It has since been extended to collect metrics and other values, and has proven equally useful in Spark integration testing.</p></div>
<div class="section">
<h2><a name="Integrating_the_Committers_with_Apache_Spark"></a>Integrating the Committers with Apache Spark</h2>
<p>Spark defines a commit protocol <tt>org.apache.spark.internal.io.FileCommitProtocol</tt>, implementing it in <tt>HadoopMapReduceCommitProtocol</tt> a subclass <tt>SQLHadoopMapReduceCommitProtocol</tt> which supports the configurable declaration of the underlying Hadoop committer class, and the <tt>ManifestFileCommitProtocol</tt> for Structured Streaming. The latter is best defined as &#x201c;a complication&#x201d; &#x2014;but without support for it, S3 cannot be used as a reliable destination of stream checkpoints.</p>
<p>One aspect of the Spark commit protocol is that alongside the Hadoop file committer, there&#x2019;s an API to request an absolute path as a target for a commit operation, <tt>newTaskTempFileAbsPath(taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String</tt>; each task&#x2019;s mapping of temp-&gt; absolute files is passed to the Spark driver in the <tt>TaskCommitMessage</tt> returned after a task performs its local commit operations (which includes requesting permission to commit from the executor). These temporary paths are renamed to the final absolute paths are renamed in <tt>FileCommitProtocol.commitJob()</tt>. This is currently a serialized rename sequence at the end of all other work. This use of absolute paths is used in writing data into a destination directory tree whose directory names is driven by partition names (year, month, etc).</p>
<p>Supporting that feature is going to be challenging; either we allow each directory in the partition tree to have its own staging directory documenting pending PUT operations, or (better) a staging directory tree is built off the base path, with all pending commits tracked in a matching directory tree.</p>
<p>Alternatively, the fact that Spark tasks provide data to the job committer on their completion means that a list of pending PUT commands could be built up, with the commit operations being executed by an S3A-specific implementation of the <tt>FileCommitProtocol</tt>.</p>
<p>[Obsolete] As noted earlier, this may permit the requirement for a consistent list operation to be bypassed. It would still be important to list what was being written, as it is needed to aid aborting work in failed tasks, but the list of files created by successful tasks could be passed directly from the task to committer, avoid that potentially-inconsistent list.</p>
<div class="section">
<div class="section">
<h4><a name="Spark.2C_Parquet_and_the_Spark_SQL_Commit_mechanism"></a>Spark, Parquet and the Spark SQL Commit mechanism</h4>
<p>Spark&#x2019;s <tt>org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat</tt> Parquet output format wants a subclass of <tt>org.apache.parquet.hadoop.ParquetOutputCommitter</tt>, the option being defined by the classname in the configuration key <tt>spark.sql.parquet.output.committer.class</tt>; this is then patched in to the value <tt>spark.sql.sources.outputCommitterClass</tt> where it is picked up by <tt>SQLHadoopMapReduceCommitProtocol</tt> and instantiated as the committer for the work.</p>
<p>This is presumably done so the user has the option of requesting a metadata summary file by setting the option <tt>&quot;parquet.enable.summary-metadata&quot;</tt>. Creating the summary file requires scanning every single file in the destination directory on the job commit, so is <i>very</i> expensive, and not something which we recommend when working with S3.</p>
<p>To use a S3Guard committer, it must also be identified as the Parquet committer. The fact that instances are dynamically instantiated somewhat complicates the process.</p>
<p>In early tests; we can switch committers for ORC output without making any changes to the Spark code or configuration other than configuring the factory for Path output committers. For Parquet support, it may be sufficient to also declare the classname of the specific committer (i.e not the factory).</p>
<p>This is unfortunate as it complicates dynamically selecting a committer protocol based on the destination filesystem type or any per-bucket configuration.</p>
<p>The solution as implemented in the <a class="externalLink" href="https://github.com/hortonworks-spark/cloud-integration">initial prototype</a> consists of two things</p>
<ol style="list-style-type: decimal">
<li>
<p>A class <tt>PathOutputCommitProtocol extends HadoopMapReduceCommitProtocol</tt> which always creates the committer using the <tt>PathOutputCommitterFactory</tt> mechanism. This ensures that output format&#x2019;s own committers are replaced with an output factory mechanism.</p>
</li>
<li>
<p>A class <tt>org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter</tt> which is a directly instantiable output committer that then creates a committer through the factory mechanism and delegates all operations to that committer. This allows a committer to be declared in any configuration option which takes a committer class, but still use the factory mechanism underneath.</p>
</li>
<li>
<p>Add a patch to Spark 2.3 [SPARK-21762], which allows any output committer to be used for <tt>ParquetFileOutput</tt>.</p>
</li>
</ol>
<p>Overall, its a bit convoluted to implement, document and use. Users must declare two spark SQL options as well as three spark.hadoop ones</p>
<div>
<div>
<pre class="source">spark.sql.sources.commitProtocolClass=com.hortonworks.spark.cloud.commit.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class=org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a=org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
spark.hadoop.fs.s3a.committer.name=magic
spark.hadoop.fs.s3a.committer.magic.enabled=true
</pre></div></div>
<p>We could actually simplify this by adding a new algorithm &#x201c;3&#x201d; to the existing <tt>FileOutputFormat</tt>, telling <tt>FileOutputFormat</tt> itself to use the factory to create committers. This would then automatically pick up the factory Avoiding loops in this situation would be &#x201c;challenging&#x201d;: If instantiated via a factory, the file committer must not attempt to use the factory itself.</p></div></div></div>
<div class="section">
<h2><a name="Security"></a>Security</h2>
<p>What are the obvious possible security risks which need to be covered and which code reviews should check for?</p>
<ul>
<li>Leakage of AWS credentials in jobs/logging.</li>
<li>Exposure of data to unauthorized users.</li>
<li>Exposure of workflow to unauthorized users (e.g. paths &amp; filenames, sequence of queries).</li>
<li>Silent tampering of data by unauthed users.</li>
<li>Attacks on jobs which run up (large) bills by leaking pending datasets.</li>
<li>DoS attacks with malicious users sabotaging/hindering progress of other users&#x2019; work.</li>
</ul>
<div class="section">
<div class="section">
<h4><a name="Security_Risks_of_the_Staging_Committers"></a>Security Risks of the Staging Committers</h4>
<p>Staged data is on local FS, in directories listed by <tt>fs.s3a.buffer.dir</tt>, falling back to the directory in <tt>hadoop.tmp.dir</tt>. These are the same directories already used to buffer blocks of data being written in output streams, prior to PUT/POST to S3. Therefore: there is more risk than before. We should clearly document the security aspects of the temp dirs to ensure this.</p>
<p>As all files written by a task are not uploaded until task commit, more local storage is needed. A malicious user executing work on one system could potentially fill up the temp disk space. Mitigation: storage quotas in local FS, keeping temp dirs on different mounted FS from root.</p>
<p>The intermediate <tt>.pendingset</tt> files are saved in HDFS under the directory in <tt>fs.s3a.committer.staging.tmp.path</tt>; defaulting to <tt>/tmp</tt>. This data can disclose the workflow (it contains the destination paths &amp; amount of data generated), and if deleted, breaks the job. If malicious code were to edit the file, by, for example, reordering the ordered etag list, the generated data would be committed out of order, creating invalid files. As this is the (usually transient) cluster FS, any user in the cluster has the potential to do this.</p>
<p><i>Locking down the temporary directories is critical to prevent malicious cluster users damaging the workflow and output.</i></p></div>
<div class="section">
<h4><a name="Security_Risks_of_the_Magic_Committer"></a>Security Risks of the Magic Committer</h4>
<p>The directory defined by <tt>fs.s3a.buffer.dir</tt> is used to buffer blocks before upload, unless the job is configured to buffer the blocks in memory. This is as before: no incremental risk. As blocks are deleted from the filesystem after upload, the amount of storage needed is determined by the data generation bandwidth and the data upload bandwidth.</p>
<p>No use is made of the cluster filesystem; there are no risks there.</p>
<p>A malicious user with write access to the <tt>__magic</tt> directory could manipulate or delete the metadata of pending uploads, or potentially inject new work int the commit. Having access to the <tt>__magic</tt> directory implies write access to the parent destination directory: a malicious user could just as easily manipulate the final output, without needing to attack the committer&#x2019;s intermediate files.</p></div></div>
<div class="section">
<h3><a name="Security_Risks_of_all_committers"></a>Security Risks of all committers</h3>
<div class="section">
<h4><a name="Visibility"></a>Visibility</h4>
<p>[Obsolete] If S3Guard is used for storing metadata, then the metadata is visible to all users with read access. A malicious user with write access could delete entries of newly generated files, so they would not be visible.</p></div>
<div class="section">
<h4><a name="Malicious_Serialized_Data"></a>Malicious Serialized Data</h4>
<p>The internal metadata summary files (<tt>.pending</tt> and <tt>.pendingset</tt>) could be tampered by someone malicious, and, when parsed or interpreted by the trusted account, used to execute untrusted code, fail, etc. The formats are all JSON, parsed with Jackson; we expect invalid JSON to result in parse errors, and fail the job. Aborting the job triggers a best-effort attempt to load the pending files, ignoring those which cannot be loaded or parsed, and aborts all pending uploads identified by the loaded files.</p>
<ul>
<li>
<p>None of the strings in the parsed dataset are passed through any interpreter (e.g used in SQL queries, shell commands or similar). Therefore (SQL) injection attacks are not possible.</p>
</li>
<li>
<p>Some of the data <i>may</i> be printed in the log files generated during process execution. For example, commit Id, destination Key, etc. These are all treated as plain text, and should be served up as such in any browser- hosted view of the logs.</p>
</li>
<li>
<p>Some of the data is returned in the <tt>toString()</tt> values of the loaded classes. This may also be logged, observed in IDEs.</p>
</li>
<li>
<p>None of the content in the serialized data is displayed in any web UI. The vulnerability there is what if this happened in the future in some downstream application: would it be possible to inject Javascript into any of the text fields, script which could then be executed in some XSS attack. We may wish to consider sanitizing this data on load.</p>
</li>
<li>
<p>Paths in tampered data could be modified in an attempt to commit an upload across an existing file, or the MPU ID altered to prematurely commit a different upload. These attempts will not going to succeed, because the destination path of the upload is declared on the initial POST to initiate the MPU, and operations associated with the MPU must also declare the path: if the path and ID are inconsistent, the operation will fail. If a valid (path, MPU) tuple were inserted in a tampered-with file, the commit or abort could complete or abort the upload prematurely. As the malicious party must already have enough access to the target S3 store to obtain this information, they are unlikely to need to tamper with the JSON files to perform such actions.</p>
</li>
</ul></div>
<div class="section">
<h4><a name="Outstanding_uncommitted_data_and_its_cost"></a>Outstanding uncommitted data and its cost</h4>
<ul>
<li>Killed jobs will leak uncommitted uploads, which will run up bills. A restarted job will automatically purge all pending uploads under the destination path on job commit, so if the job is rerun it will cancel the pending writes of the previous job attempt.</li>
</ul>
<p>We shall also provide a CLI tool to list and delete pending uploads under a path.</p>
<ul>
<li>configuring a store to automatically clean pending uploads after a time period such as 24h will guarantee that pending upload data will always be deleted, even without a rerun of the job or use of the CLI tool.</li>
</ul>
<p>AWS document <a class="externalLink" href="http://docs.aws.amazon.com/AmazonS3/latest/dev/mpuAndPermissions.html">the permissions for MPU</a>. There are special permissions <tt>s3:ListBucketMultipartUploads</tt>, <tt>s3:ListMultipartUploadParts</tt> and <tt>s3:AbortMultipartUpload</tt>. By default bucket owner has all these permissions, MPU initiator will be granted the permissions to list the upload, list hte parts and abort their own uploads. Bucket owner may grant/deny perms to either (this may permit a user to be able to initiate &amp; complete MPU, but not delete and abort).</p></div>
<div class="section">
<h4><a name="Proposed_security_settings_.26_recommendations"></a>Proposed security settings &amp; recommendations</h4>
<ul>
<li>Bucket access restricted to specific IAM roles.</li>
<li><tt>fs.s3a.buffer.dir</tt> set to location under <tt>/tmp</tt> with read &amp; write access restricted to the active user.</li>
<li><tt>fs.s3a.committer.staging.tmp.path</tt> should be isolated to the active each user. Proposed: make the default an unqualified path, <tt>tmp/staging</tt>, which will made absolute relative to the current user. In filesystems in which access under user&#x2019;s home directories are restricted, this final, absolute path, will not be visible to untrusted accounts.</li>
<li>
<p>Maybe: define the for valid characters in a text strings, and a regex for validating, e,g, <tt>[a-zA-Z0-9 \.\,\(\) \-\+]+</tt> and then validate any free text JSON fields on load and save.</p>
</li>
</ul></div></div></div>
</div>
</div>
<div class="clear">
<hr/>
</div>
<div id="footer">
<div class="xright">
&#169; 2008-2021
Apache Software Foundation
- <a href="http://maven.apache.org/privacy-policy.html">Privacy Policy</a>.
Apache Maven, Maven, Apache, the Apache feather logo, and the Apache Maven project logos are trademarks of The Apache Software Foundation.
</div>
<div class="clear">
<hr/>
</div>
</div>
</body>
</html>