MAPREDUCE-2040. Forrest Documentation for Dynamic Priority Scheduler. Contributed by Thomas Sandholm.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1004886 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 8746949..eefaf17 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -324,6 +324,11 @@
Release 0.21.1 - Unreleased
+ NEW FEATURES
+
+ MAPREDUCE-2040. Forrest Documentation for Dynamic Priority Scheduler.
+ (Thomas Sandholm via tomwhite)
+
BUG FIXES
MAPREDUCE-1897. trunk build broken on compile-mapred-test (cos)
diff --git a/src/docs/src/documentation/content/xdocs/dynamic_scheduler.xml b/src/docs/src/documentation/content/xdocs/dynamic_scheduler.xml
new file mode 100644
index 0000000..be653c3
--- /dev/null
+++ b/src/docs/src/documentation/content/xdocs/dynamic_scheduler.xml
@@ -0,0 +1,456 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN" "http://forrest.apache.org/dtd/document-v20.dtd">
+
+<document>
+
+ <header>
+ <title>Dynamic Priority Scheduler</title>
+ </header>
+
+ <body>
+
+ <section>
+ <title>Purpose</title>
+
+ <p>This document describes the Dynamic Priority Scheduler, a pluggable
+ MapReduce scheduler for Hadoop which provides a way to automatically manage
+ user job QoS based on demand.</p>
+ </section>
+
+ <section>
+ <title>Features</title>
+
+ <p>The Dynamic Priority Scheduler supports the following features:</p>
+ <ul>
+ <li>
+ Users may change their dedicated queue capacity at any point in time by specifying how much
+ they are willing to pay for running on a (map or reduce) slot over an allocation interval
+ (called the spending rate).
+ </li>
+ <li>
+ Users are given a budget when signing up to use the Hadoop cluster. This budget may then be used
+ to run jobs requiring different QoS over time. QoS allocation intervals in the order of a few
+ seconds are supported allowing the cluster capacity allocations to follow the demand closely.
+ </li>
+ <li>
+ The slot capacity share given to a user is proportional to the user's spending rate
+ and inversely proportional to the spending rates of all the other users in the
+ same allocation interval.
+ </li>
+ <li>
+ Preemption is supported but may be disabled.
+ </li>
+ <li>
+ Work conservation is supported. If a user doesn't use up the slots guaranteed
+ based on the spending rates, other users may use these slots. Whoever uses the
+ slots pays for them but nobody pays more than the spending rate they bid.
+ </li>
+ <li>
+ When you don't run any jobs you don't pay anything out of your budget.
+ </li>
+ <li>
+ When preempting tasks, the tasks that have been running for the shortest
+ period of time will be killed first.
+ </li>
+ <li>
+ A secure REST XML RPC interface allows programmatic access to queue management
+ both for users and administrators. The same access control mechanism has also
+ been integrated in the standard Hadoop job client for submitting jobs to personal queues.
+ </li>
+ <li>
+ A simple file-based budget management back-end is provided but database backends
+ may be plugged in as well.
+ </li>
+ <li>
+ The scheduler is very lightweight in terms of queue memory footprint and overhead,
+ so a large number of concurrently serving queues (100+) are supported.
+ </li>
+ </ul>
+ </section>
+
+ <section>
+ <title>Problem Addressed</title>
+
+ <p>Granting users (here MapReduce job clients) capacity shares on computational clusters is
+ typically done manually by the system administrator of the Hadoop installation. If users
+ want more resources they need to renegotiate their share either with the administrator or
+ the competing users. This form of social scheduling works fine in small teams in trusted
+ environments, but breaks down in large-scale, multi-tenancy clusters with users from
+ multiple organizations.</p>
+
+ <p>Even if the users are cooperative it is too complex to renegotiate
+ shares manually. If users' individual jobs vary in importance (criticality to meet certain
+ deadlines) over time and between job types severe allocation inefficiencies may occur.
+ For example, a user with a high allocated capacity may run large low-priority test jobs
+ starving out more important jobs from other users.</p>
+
+ </section>
+ <section>
+ <title>Approach</title>
+ <p>To solve this problem we introduce the concept of dynamic regulated priorities. As
+ before each user is granted a certain initial quota, which we call budget. However, instead
+ of mapping the budget to a capacity share statically we allow users to specify how much
+ of their budget they are willing to spend on each job at any given point in time. We call
+ this amount the spending rate, and it is defined in units of the budget that a user is willing
+ to spend per task per allocation interval (typically < 1 minute but configurable).</p>
+
+ <p>The share allocated to a specific user is calculated as her spending rate over the spending
+ rates of all users. The scheduler is preempting tasks to guarantee the shares, but it is at the
+ same time work conserving (lets users exceed their share if no other tasks are running).
+ Furthermore, the users are only charged for the fraction of the shares they actually use, so
+ if they don't run any jobs their spending goes to 0. In this text a user is defined as the
+ owner of a queue which is the unit of Authentication, Authorization, and Accounting (AAA). If
+ there is no need to separate actual users' AAA,
+ then they could share a queue, but that should be seen equivalent to users sharing a password, i
+ which in general is frowned upon.</p>
+ </section>
+ <section>
+ <title>Comparison to other Hadoop Schedulers</title>
+ <p>Hadoop-on-demand (HOD) allows individual users to create private MapReduce clusters
+ on demand to meet their changing demands over time. Prioritization across users is still a
+ static server configuration, which in essence still relies on users being cooperative during
+ high-demand periods. Further, HOD breaks the data-local scheduling principle of
+ MapReduce and makes it more difficult to efficiently share large data sets.</p>
+
+ <p>The fair-share (FAIR), and capacity (CAP) schedulers are both extensions of a simple
+ fifo queue that allow relative priorities to be set on queues. There is no notion of charging
+ or accounting on a per-use basis, and shares cannot be set by the users (they have to be
+ negotiated a-priori). So although sharing data is more efficient than with HOD, these
+ schedulers suffer from the same kind of social scheduling inefficiencies.</p>
+
+ <p>Amazon Web Services (AWS) Elastic MapReduce allows virtual Hadoop clusters, which
+ read input from and writes output to S3, to be set up automatically. New instances can be
+ added to or removed from the cluster to scale the jobs up or down. Our approach is more
+ lightweight in that the virtualization is not on an OS level but on a queue level. This
+ allows us to react faster to temporary application bursts and to co-host more concurrent
+ users on each physical node. Furthermore, our approach implements a demand-based
+ pricing model whereas AWS uses fixed pricing. Furthermore, our solution works in any
+ Hadoop cluster, not only in an AWS environment. Finally, with AWS you can get up and
+ running and reconfigure within the order of 10 minutes. With this scheduler you can get
+ started and reconfigure within seconds (specified with alloc-interval below).</p>
+ </section>
+ <section>
+ <title>Implementation and Use</title>
+ <section>
+ <title>Accounting</title>
+ <p>Users specify a spending rate. This is the rate deducted for each active/used task slot per
+ allocation interval. The shares allocated are not calculated directly based on the user
+ specified spending rate but the effective rate that is currently being paid by users.
+ If a user has no pending or running jobs the effective spending rate for that user is set to
+ 0, which assures that the user is not charged anything against her budget. If a job is
+ pending the effective rate is set to the user-specified rate to allow a share to be allocated
+ so the job can be moved from pending to active state.</p>
+
+ <p> The number of map tasks and
+ reduce tasks granted to a user in each allocation interval is based on the effective
+ spending rate in the previous allocation interval. If the user only uses a subset of the
+ allocated tasks only that subset is being charged for. Conversely, if the user is able to run
+ more tasks than the granted quota due to other tasks not running up to their full quota
+ only the spending rate times the quota is being charged for. The price signaled to users so
+ they can estimate shares accurately is based on the effective rates.</p>
+ </section>
+ <section>
+ <title>Preemption</title>
+ <p>Tasks that are run in excess of the quota allocated to a particular user are subject to
+ preemption. No preemption will occur unless there are pending tasks for a user who has
+ not fulfilled her full quota. Queues (users) to preempt are picked based on the latest start-
+ time of jobs in excess of their quota.</p>
+
+ <p>Excess jobs in a queue will not be killed unless all
+ excess jobs have been killed from queues with later start times. Within a queue that is
+ exceeding its quota the tasks that have run the shortest time will be killed first. All killed
+ tasks are put back in a pending state and will thus be resubmitted as soon as existing tasks
+ finish or the effective share for the queue goes up, e.g. because other users' jobs finish.
+ The preemption interval may be set equal to the scheduling interval, a longer interval, or
+ 0 in which case no preemption (task killing) is done.</p>
+ </section>
+ <section>
+ <title>Security</title>
+ <p>If any user can impersonate any other user or if any user can submit to any queue, the
+ economic incentives of the mechanism in the scheduler breaks down and the problem of
+ social scheduling and free-riding users comes back. Hence, stronger authentication and
+ authorization is required in this scheduler compared to the other schedulers. Since the
+ authz mechanisms are still being worked out in Hadoop we implemented a simple shared
+ secret signature based protocol inspired by the AWS query (REST) API authentication
+ used for EC2 services.</p>
+
+ <p>Simple guards against replay attacks and clock/nonce
+ synchronization are also available but the main idea is to require that users prove to have
+ a secret key to be allowed to submit jobs or control queue spending rates. There are
+ currently two roles implemented, users and administrators. In general users have access
+ to information such as the current price (aggregate spending rates of active queues) of the
+ cluster and their own queue settings and usage, and can change the spending rate of their
+ queue. Admin users may also add budgets to queues, create and remove queues, and get
+ detailed usage info from all queues. </p>
+
+ <p>The basic authentication model relies on the clients
+ calculating a HMAC/SHA1 signature across their request input as well as a timestamp
+ and their user name, using their secret key. The server looks up the user's secret key in an
+ acl and determines the role granted after verifying that the signature is ok. For the REST
+ API discussed next the signature is passed in the standard HTTP Authorization header
+ and for the job submission protocol it is carried in a job configuration parameter
+ (mapred.job.signature).</p>
+
+ <p>If stronger authentication protocols (asynchronous keys) are developed for Hadoop at
+ least the job submission protocol will be adopted to use it.</p>
+ </section>
+ <section>
+ <title>Control</title>
+ <p>The job tracker installs a servlet accepting signed HTTP GETs and returning well-formed
+ XML responses. The servlet is installed in the scheduler context (like the fair-share
+ scheduler UI)</p>
+ <p>It is installed at [job tracker URL]/scheduler</p>
+
+ <table>
+ <tr>
+ <th>HTTP Option</th>
+ <th>Description</th>
+ <th>Authz Required</th>
+ </tr>
+ <tr>
+ <td>price</td>
+ <td>Gets current price</td>
+ <td>None</td>
+ </tr>
+ <tr>
+ <td>time</td>
+ <td>Gets current time</td>
+ <td>None</td>
+ </tr>
+ <tr>
+ <td>info=<code>queue</code></td>
+ <td>Gets queue usage info</td>
+ <td>User</td>
+ </tr>
+ <tr>
+ <td>info</td>
+ <td>Gets queue usage info</td>
+ <td>User</td>
+ </tr>
+ <tr>
+ <td>infos</td>
+ <td>Gets usage info for all queues</td>
+ <td>Admin</td>
+ </tr>
+ <tr>
+ <td>setSpending=<code>spending</code> &queue=<code>queue</code></td>
+ <td>Set the spending rate for queue</td>
+ <td>User</td>
+ </tr>
+ <tr>
+ <td>addBudget=<code>budget</code> &queue=<code>queue</code></td>
+ <td>Add budget to queue</td>
+ <td>Admin</td>
+ </tr>
+ <tr>
+ <td>addQueue=<code>queue</code></td>
+ <td>Add queue</td>
+ <td>Admin</td>
+ </tr>
+ <tr>
+ <td>removeQueue=<code>queue</code></td>
+ <td>Remove queue</td>
+ <td>Admin</td>
+ </tr>
+ </table>
+
+ <p>Example response for authorized requests:</p>
+ <source>
+ <QueueInfo>
+ <host>myhost</host>
+ <queue name="queue1">
+ <budget>99972.0</budget>
+ <spending>0.11</spending>
+ <share>0.008979593</share>
+ <used>1</used>
+ <pending>43</pending>
+ </queue>
+ </QueueInfo></source>
+
+ <p>Example response for time request:</p>
+ <source>
+ <QueueInfo>
+ <host>myhost</host>
+ <start>1238600160788</start>
+ <time>1238605061519</time>
+ </QueueInfo></source>
+
+ <p>Example response for price request:</p>
+ <source>
+ <QueueInfo>
+ <host>myhost</host>
+ <price>12.249998</price>
+ </QueueInfo></source>
+
+ <p>Failed Authentications/Authorizations will return HTTP error code 500, ACCESS
+ DENIED: <code>query string</code></p>
+ <p/>
+ <p/>
+ <p/>
+ <p/>
+ <p/>
+ </section>
+ <section>
+ <title>Configuration</title>
+
+ <table>
+ <tr>
+ <th>Option</th>
+ <th>Default</th>
+ <th>Comment</th>
+ </tr>
+ <tr>
+ <td>mapred.jobtracker. taskScheduler</td>
+ <td>Hadoop FIFO scheduler</td>
+ <td>Needs to be set to org.apache.hadoop.mapred. DynamicPriorityScheduler</td>
+ </tr>
+ <tr>
+ <td>mapred.priority-scheduler. kill-interval</td>
+ <td>0 (don't preempt)</td>
+ <td>Interval between preemption/kill attempts in seconds</td>
+ </tr>
+ <tr>
+ <td>mapred.dynamic-scheduler. alloc-interval</td>
+ <td>20</td>
+ <td>Interval between allocation and accounting updates in seconds</td>
+ </tr>
+ <tr>
+ <td>mapred.dynamic-scheduler. budget-file</td>
+ <td>/etc/hadoop.budget</td>
+ <td>File used to store budget info. Jobtracker user needs write access to file which must exist.</td>
+ </tr>
+ <tr>
+ <td>mapred.priority-scheduler. acl-file</td>
+ <td>/etc/hadoop.acl</td>
+ <td>File where user keys and roles are stored. Jobtracker user needs read access to file which must exist.</td>
+ </tr>
+ </table>
+
+ <p>Budget File Format (do not edit manually if scheduler is running, then servlet API
+should be used):</p>
+ <source>
+ user_1 budget_1 spending_1
+ ...
+ user_n budget_n spending_n</source>
+
+ <p>ACL File Format (can be updated without restarting Jobtracker):</p>
+ <source>
+ user_1 role_1 key_1
+ ...
+ user_n role_n key_n</source>
+ <p><code>role</code> can be either admin or user.</p>
+ </section>
+ <section>
+ <title>Examples</title>
+ <p>Example Request to set the spending rate</p>
+ <source>
+ http://myhost:50030/scheduler?setSpending=0.01&queue=myqueue</source>
+ <p>The Authorization header is used for signing</p>
+
+ <p>The signature is created akin to the AWS Query Authentication scheme</p>
+ <source>HMAC_SHA1("[query path]&user=[user]&timestamp=[timestamp]", key)</source>
+
+ <p>For the servlet operations query path is everything that comes after /scheduler?
+ in the url. For job submission the query path is just the empty string "".</p>
+ <p>Job submissions also need to set the following job properties:</p>
+
+ <source>
+ -Dmapred.job.timestamp=[ms epoch time]
+ -Dmapred.job.signature=[signature as above]
+ -Dmapreduce.job.queue.name=[queue]</source>
+ <p>Note queue must match the user submitting the job.</p>
+
+ <p>Example python query</p>
+ <source>
+import base64
+import hmac
+import sha
+import httplib, urllib
+import sys
+import time
+from popen2 import popen3
+import os
+
+def hmac_sha1(data, key):
+ return urllib.quote(base64.encodestring(hmac.new(key, data, sha).digest()).strip())
+
+stdout, stdin, stderr = popen3("id -un")
+USER = stdout.read().strip()
+f = open(os.path.expanduser("~/.ssh/hadoop_key"))
+KEY = f.read().strip()
+f.close()
+f = open(os.path.expanduser("/etc/hadoop_server"))
+SERVER = f.read().strip()
+f.close()
+URL = "/scheduler"
+conn = httplib.HTTPConnection(SERVER)
+params = sys.argv[1]
+params = params + "&user=%s&timestamp=%d" % (USER,long(time.time()*1000))
+print params
+headers = {"Authorization": hmac_sha1(params, KEY)}
+print headers
+conn.request("GET",URL + "?" + params,None, headers)
+response = conn.getresponse()
+print response.status, response.reason
+data = response.read()
+conn.close()
+print data</source>
+
+<p>Example python job submission parameter generation</p>
+<source>
+import base64
+import hmac
+import sha
+import httplib, urllib
+import sys
+import time
+import os
+from popen2 import popen3
+
+def hmac_sha1(data, key):
+ return urllib.quote(base64.encodestring(hmac.new(key, data, sha).digest()).strip())
+
+stdout, stdin, stderr = popen3("id -un")
+USER = stdout.read().strip()
+f = open(os.path.expanduser("~/.ssh/hadoop_key"))
+KEY = f.read().strip()
+f.close()
+if len(sys.argv) > 1:
+ params = sys.argv[1]
+else:
+ params = ""
+timestamp = long(time.time()*1000)
+params = params + "&user=%s&timestamp=%d" % (USER,timestamp)
+print "-Dmapred.job.timestamp=%d -Dmapred.job.signature=%s -Dmapreduce.job.queue.name=%s" % (timestamp, hmac_sha1(params, KEY), USER)</source>
+ </section>
+ </section>
+ <section>
+ <title>Bibliography</title>
+ <p><code>T. Sandholm and K. Lai</code>. <strong>Mapreduce optimization using regulated dynamic prioritization</strong>.
+ In SIGMETRICS '09: Proceedings of the eleventh international joint conference on Measurement and modeling of computer systems, pages 299-310, New York, NY, USA, 2009.</p>
+ <p><code>T. Sandholm, K. Lai</code>. <strong>Dynamic Proportional Share Scheduling in Hadoop</strong>.
+ In JSSPP '10: Proceedings of the 15th Workshop on Job Scheduling Strategies for Parallel Processing, Atlanta, April 2010.</p>
+ <p><code>A. Lenk, J. Nimis, T. Sandholm, S. Tai</code>. <strong>An Open Framework to Support the Development of Commercial Cloud Offerings based on Pre-Existing Applications</strong>.
+ In CCV '10: Proceedings of the International Conference on Cloud Computing and Virtualization, Singapore, May 2010.</p>
+ </section>
+
+ </body>
+
+</document>
diff --git a/src/docs/src/documentation/content/xdocs/site.xml b/src/docs/src/documentation/content/xdocs/site.xml
index 013f87e..c1f5e77 100644
--- a/src/docs/src/documentation/content/xdocs/site.xml
+++ b/src/docs/src/documentation/content/xdocs/site.xml
@@ -49,6 +49,7 @@
<docs label="Schedulers">
<cap_scheduler label="Capacity Scheduler" href="capacity_scheduler.html"/>
<fair_scheduler label="Fair Scheduler" href="fair_scheduler.html"/>
+ <dyn_scheduler label="Dynamic Priority Scheduler" href="dynamic_scheduler.html"/>
</docs>
<docs label="Miscellaneous">