MAPREDUCE-2040. Forrest Documentation for Dynamic Priority Scheduler. Contributed by Thomas Sandholm.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1004886 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 8746949..eefaf17 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -324,6 +324,11 @@
 
 Release 0.21.1 - Unreleased
 
+  NEW FEATURES
+
+    MAPREDUCE-2040. Forrest Documentation for Dynamic Priority Scheduler.
+    (Thomas Sandholm via tomwhite)
+
   BUG FIXES
 
     MAPREDUCE-1897. trunk build broken on compile-mapred-test (cos)
diff --git a/src/docs/src/documentation/content/xdocs/dynamic_scheduler.xml b/src/docs/src/documentation/content/xdocs/dynamic_scheduler.xml
new file mode 100644
index 0000000..be653c3
--- /dev/null
+++ b/src/docs/src/documentation/content/xdocs/dynamic_scheduler.xml
@@ -0,0 +1,456 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN" "http://forrest.apache.org/dtd/document-v20.dtd">
+
+<document>
+  
+  <header>
+    <title>Dynamic Priority Scheduler</title>
+  </header>
+  
+  <body>
+  
+    <section>
+      <title>Purpose</title>
+      
+      <p>This document describes the Dynamic Priority Scheduler, a pluggable 
+      MapReduce scheduler for Hadoop which provides a way to automatically manage 
+      user job QoS based on demand.</p>
+    </section>
+    
+    <section>
+      <title>Features</title>
+      
+      <p>The Dynamic Priority Scheduler supports the following features:</p> 
+      <ul>
+        <li>
+          Users may change their dedicated queue capacity at any point in time by specifying how much
+          they are willing to pay for running on a (map or reduce) slot over an allocation interval 
+          (called the spending rate).
+        </li>
+        <li>
+          Users are given a budget when signing up to use the Hadoop cluster. This budget may then be used
+          to run jobs requiring different QoS over time. QoS allocation intervals in the order of a few
+          seconds are supported allowing the cluster capacity allocations to follow the demand closely.
+        </li>
+        <li>
+          The slot capacity share given to a user is proportional to the user's spending rate
+          and inversely proportional to the spending rates of all the other users in the
+          same allocation interval. 
+        </li>
+        <li>
+          Preemption is supported but may be disabled.
+        </li>
+        <li>
+          Work conservation is supported. If a user doesn't use up the slots guaranteed
+          based on the spending rates, other users may use these slots. Whoever uses the
+          slots pays for them but nobody pays more than the spending rate they bid.
+        </li>
+        <li>
+          When you don't run any jobs you don't pay anything out of your budget.
+        </li>
+        <li>
+          When preempting tasks, the tasks that have been running for the shortest
+          period of time will be killed first.
+        </li>
+        <li>
+          A secure REST XML RPC interface allows programmatic access to queue management
+          both for users and administrators. The same access control mechanism has also
+          been integrated in the standard Hadoop job client for submitting jobs to personal queues. 
+        </li>
+        <li>
+          A simple file-based budget management back-end is provided but database backends
+          may be plugged in as well.
+        </li>
+        <li>
+          The scheduler is very lightweight in terms of queue memory footprint and overhead,
+          so a large number of concurrently serving queues (100+) are supported.
+        </li>
+      </ul>
+    </section>
+    
+    <section>
+      <title>Problem Addressed</title>
+      
+      <p>Granting users (here MapReduce job clients) capacity shares on computational clusters is
+       typically done manually by the system administrator of the Hadoop installation. If users
+       want more resources they need to renegotiate their share either with the administrator or
+       the competing users. This form of social scheduling works fine in small teams in trusted
+       environments, but breaks down in large-scale, multi-tenancy clusters with users from
+       multiple organizations.</p>
+
+       <p>Even if the users are cooperative it is too complex to renegotiate
+       shares manually. If users' individual jobs vary in importance (criticality to meet certain
+       deadlines) over time and between job types severe allocation inefficiencies may occur.
+       For example, a user with a high allocated capacity may run large low-priority test jobs
+       starving out more important jobs from other users.</p>
+
+    </section>
+    <section>
+      <title>Approach</title>
+      <p>To solve this problem we introduce the concept of dynamic regulated priorities. As
+      before each user is granted a certain initial quota, which we call budget. However, instead
+      of mapping the budget to a capacity share statically we allow users to specify how much
+      of their budget they are willing to spend on each job at any given point in time. We call
+      this amount the spending rate, and it is defined in units of the budget that a user is willing
+      to spend per task per allocation interval (typically &lt; 1 minute but configurable).</p>
+     
+      <p>The share allocated to a specific user is calculated as her spending rate over the spending
+      rates of all users. The scheduler is preempting tasks to guarantee the shares, but it is at the
+      same time work conserving (lets users exceed their share if no other tasks are running).
+      Furthermore, the users are only charged for the fraction of the shares they actually use, so
+      if they don't run any jobs their spending goes to 0. In this text a user is defined as the
+      owner of a queue which is the unit of Authentication, Authorization, and Accounting (AAA). If
+      there is no need to separate actual users' AAA, 
+      then they could share a queue, but that should be seen equivalent to users sharing a password, i
+      which in general is frowned upon.</p>
+    </section>
+    <section>
+      <title>Comparison to other Hadoop Schedulers</title>
+      <p>Hadoop-on-demand (HOD) allows individual users to create private MapReduce clusters
+       on demand to meet their changing demands over time. Prioritization across users is still a
+       static server configuration, which in essence still relies on users being cooperative during
+       high-demand periods. Further, HOD breaks the data-local scheduling principle of
+       MapReduce and makes it more difficult to efficiently share large data sets.</p>
+
+      <p>The fair-share (FAIR), and capacity (CAP) schedulers are both extensions of a simple
+      fifo queue that allow relative priorities to be set on queues. There is no notion of charging
+      or accounting on a per-use basis, and shares cannot be set by the users (they have to be
+      negotiated a-priori). So although sharing data is more efficient than with HOD, these
+      schedulers suffer from the same kind of social scheduling inefficiencies.</p>
+
+     <p>Amazon Web Services (AWS) Elastic MapReduce allows virtual Hadoop clusters, which
+     read input from and writes output to S3, to be set up automatically. New instances can be
+     added to or removed from the cluster to scale the jobs up or down. Our approach is more
+     lightweight in that the virtualization is not on an OS level but on a queue level. This
+     allows us to react faster to temporary application bursts and to co-host more concurrent
+     users on each physical node. Furthermore, our approach implements a demand-based
+     pricing model whereas AWS uses fixed pricing. Furthermore, our solution works in any
+     Hadoop cluster, not only in an AWS environment. Finally, with AWS you can get up and
+     running and reconfigure within the order of 10 minutes. With this scheduler you can get
+     started and reconfigure within seconds (specified with alloc-interval below).</p>
+    </section>
+    <section>
+    <title>Implementation and Use</title>
+    <section>
+      <title>Accounting</title>
+      <p>Users specify a spending rate. This is the rate deducted for each active/used task slot per
+      allocation interval. The shares allocated are not calculated directly based on the user
+      specified spending rate but the effective rate that is currently being paid by users.
+      If a user has no pending or running jobs the effective spending rate for that user is set to
+      0, which assures that the user is not charged anything against her budget. If a job is
+      pending the effective rate is set to the user-specified rate to allow a share to be allocated
+      so the job can be moved from pending to active state.</p>
+
+      <p> The number of map tasks and
+      reduce tasks granted to a user in each allocation interval is based on the effective
+      spending rate in the previous allocation interval. If the user only uses a subset of the
+      allocated tasks only that subset is being charged for. Conversely, if the user is able to run
+      more tasks than the granted quota due to other tasks not running up to their full quota
+      only the spending rate times the quota is being charged for. The price signaled to users so
+      they can estimate shares accurately is based on the effective rates.</p>
+    </section>
+    <section>
+      <title>Preemption</title>
+      <p>Tasks that are run in excess of the quota allocated to a particular user are subject to
+      preemption. No preemption will occur unless there are pending tasks for a user who has
+      not fulfilled her full quota. Queues (users) to preempt are picked based on the latest start-
+      time of jobs in excess of their quota.</p>
+
+      <p>Excess jobs in a queue will not be killed unless all
+      excess jobs have been killed from queues with later start times. Within a queue that is
+      exceeding its quota the tasks that have run the shortest time will be killed first. All killed
+      tasks are put back in a pending state and will thus be resubmitted as soon as existing tasks
+      finish or the effective share for the queue goes up, e.g. because other users' jobs finish.
+      The preemption interval may be set equal to the scheduling interval, a longer interval, or
+      0 in which case no preemption (task killing) is done.</p>
+    </section>
+    <section>
+      <title>Security</title>
+      <p>If any user can impersonate any other user or if any user can submit to any queue, the
+      economic incentives of the mechanism in the scheduler breaks down and the problem of
+      social scheduling and free-riding users comes back. Hence, stronger authentication and
+      authorization is required in this scheduler compared to the other schedulers. Since the
+      authz mechanisms are still being worked out in Hadoop we implemented a simple shared
+      secret signature based protocol inspired by the AWS query (REST) API authentication
+      used for EC2 services.</p>
+
+      <p>Simple guards against replay attacks and clock/nonce
+      synchronization are also available but the main idea is to require that users prove to have
+      a secret key to be allowed to submit jobs or control queue spending rates. There are
+      currently two roles implemented, users and administrators. In general users have access
+      to information such as the current price (aggregate spending rates of active queues) of the
+      cluster and their own queue settings and usage, and can change the spending rate of their
+      queue. Admin users may also add budgets to queues, create and remove queues, and get
+      detailed usage info from all queues. </p>
+
+      <p>The basic authentication model relies on the clients
+      calculating a HMAC/SHA1 signature across their request input as well as a timestamp
+      and their user name, using their secret key. The server looks up the user's secret key in an
+      acl and determines the role granted after verifying that the signature is ok. For the REST
+      API discussed next the signature is passed in the standard HTTP Authorization header
+      and for the job submission protocol it is carried in a job configuration parameter
+      (mapred.job.signature).</p>
+
+     <p>If stronger authentication protocols (asynchronous keys) are developed for Hadoop at
+     least the job submission protocol will be adopted to use it.</p>
+    </section>
+    <section>
+      <title>Control</title>
+      <p>The job tracker installs a servlet accepting signed HTTP GETs and returning well-formed
+      XML responses. The servlet is installed in the scheduler context (like the fair-share
+      scheduler UI)</p>
+      <p>It is installed at [job tracker URL]/scheduler</p>
+
+     <table>
+            <tr>
+              <th>HTTP Option</th>
+              <th>Description</th>
+              <th>Authz Required</th>
+            </tr>
+            <tr>
+              <td>price</td>
+              <td>Gets current price</td>
+              <td>None</td>
+            </tr>
+            <tr>
+              <td>time</td>
+              <td>Gets current time</td>
+              <td>None</td>
+            </tr>
+            <tr>
+              <td>info=<code>queue</code></td>
+              <td>Gets queue usage info</td>
+              <td>User</td>
+            </tr>
+            <tr>
+              <td>info</td>
+              <td>Gets queue usage info</td>
+              <td>User</td>
+            </tr>
+            <tr>
+              <td>infos</td>
+              <td>Gets usage info for all queues</td>
+              <td>Admin</td>
+            </tr>
+            <tr>
+              <td>setSpending=<code>spending</code> &amp;queue=<code>queue</code></td>
+              <td>Set the spending rate for queue</td>
+              <td>User</td>
+            </tr>
+            <tr>
+              <td>addBudget=<code>budget</code> &amp;queue=<code>queue</code></td>
+              <td>Add budget to queue</td>
+              <td>Admin</td>
+            </tr>
+            <tr>
+              <td>addQueue=<code>queue</code></td>
+              <td>Add queue</td>
+              <td>Admin</td>
+            </tr>
+            <tr>
+              <td>removeQueue=<code>queue</code></td>
+              <td>Remove queue</td>
+              <td>Admin</td>
+            </tr>
+     </table>
+
+     <p>Example response for authorized requests:</p>
+     <source>
+       &lt;QueueInfo&gt;
+         &lt;host&gt;myhost&lt;/host&gt;
+         &lt;queue name="queue1"&gt;
+           &lt;budget&gt;99972.0&lt;/budget&gt;
+           &lt;spending&gt;0.11&lt;/spending&gt;
+           &lt;share&gt;0.008979593&lt;/share&gt;
+           &lt;used&gt;1&lt;/used&gt;
+           &lt;pending&gt;43&lt;/pending&gt;
+         &lt;/queue&gt;
+       &lt;/QueueInfo&gt;</source>
+
+     <p>Example response for time request:</p>
+     <source>
+       &lt;QueueInfo&gt;
+         &lt;host&gt;myhost&lt;/host&gt;
+         &lt;start&gt;1238600160788&lt;/start&gt;
+         &lt;time&gt;1238605061519&lt;/time&gt;
+       &lt;/QueueInfo&gt;</source>
+
+     <p>Example response for price request:</p>
+     <source>
+       &lt;QueueInfo&gt;
+         &lt;host&gt;myhost&lt;/host&gt;
+         &lt;price&gt;12.249998&lt;/price&gt;
+       &lt;/QueueInfo&gt;</source>
+
+     <p>Failed Authentications/Authorizations will return HTTP error code 500, ACCESS
+      DENIED: <code>query string</code></p>
+     <p/>
+     <p/>
+     <p/>
+     <p/>
+     <p/>
+    </section>
+    <section>
+      <title>Configuration</title>
+
+      <table>
+            <tr>
+              <th>Option</th>
+              <th>Default</th>
+              <th>Comment</th>
+            </tr>
+            <tr>
+              <td>mapred.jobtracker. taskScheduler</td>
+              <td>Hadoop FIFO scheduler</td>
+              <td>Needs to be set to org.apache.hadoop.mapred. DynamicPriorityScheduler</td>
+            </tr>
+            <tr>
+              <td>mapred.priority-scheduler. kill-interval</td>
+              <td>0 (don't preempt)</td>
+              <td>Interval between preemption/kill attempts in seconds</td>
+            </tr>
+            <tr>
+              <td>mapred.dynamic-scheduler. alloc-interval</td>
+              <td>20</td>
+              <td>Interval between allocation and accounting updates in seconds</td>
+            </tr>
+            <tr>
+              <td>mapred.dynamic-scheduler. budget-file</td>
+              <td>/etc/hadoop.budget</td>
+              <td>File used to store budget info. Jobtracker user needs write access to file which must exist.</td>
+            </tr>
+            <tr>
+              <td>mapred.priority-scheduler. acl-file</td>
+              <td>/etc/hadoop.acl</td>
+              <td>File where user keys and roles are stored. Jobtracker user needs read access to file which must exist.</td>
+            </tr>
+      </table>
+
+     <p>Budget File Format (do not edit manually if scheduler is running, then servlet API
+should be used):</p>
+     <source>
+       user_1 budget_1 spending_1
+       ...
+       user_n budget_n spending_n</source>
+
+     <p>ACL File Format (can be updated without restarting Jobtracker):</p>
+     <source>
+       user_1 role_1 key_1
+       ...
+       user_n role_n key_n</source>
+     <p><code>role</code> can be either admin or user.</p>
+    </section>
+    <section>
+    <title>Examples</title>
+    <p>Example Request to set the spending rate</p>
+    <source>
+  http://myhost:50030/scheduler?setSpending=0.01&amp;queue=myqueue</source>
+  <p>The Authorization header is used for signing</p>
+
+  <p>The signature is created akin to the AWS Query Authentication scheme</p>
+  <source>HMAC_SHA1("[query path]&amp;user=[user]&amp;timestamp=[timestamp]", key)</source>
+
+  <p>For the servlet operations query path is everything that comes after /scheduler?
+     in the url. For job submission the query path is just the empty string "".</p>
+   <p>Job submissions also need to set the following job properties:</p>
+
+  <source>
+   -Dmapred.job.timestamp=[ms epoch time] 
+   -Dmapred.job.signature=[signature as above] 
+   -Dmapreduce.job.queue.name=[queue]</source>
+  <p>Note queue must match the user submitting the job.</p>
+
+   <p>Example python query</p>
+   <source>
+import base64
+import hmac
+import sha
+import httplib, urllib
+import sys
+import time
+from popen2 import popen3
+import os
+
+def hmac_sha1(data, key):
+    return urllib.quote(base64.encodestring(hmac.new(key, data, sha).digest()).strip())
+
+stdout, stdin, stderr = popen3("id -un")
+USER = stdout.read().strip()
+f = open(os.path.expanduser("~/.ssh/hadoop_key"))
+KEY = f.read().strip()
+f.close()
+f = open(os.path.expanduser("/etc/hadoop_server"))
+SERVER = f.read().strip()
+f.close()
+URL = "/scheduler"
+conn = httplib.HTTPConnection(SERVER)
+params = sys.argv[1]
+params = params + "&amp;user=%s&amp;timestamp=%d" % (USER,long(time.time()*1000))
+print params
+headers = {"Authorization": hmac_sha1(params, KEY)}
+print headers
+conn.request("GET",URL + "?" + params,None, headers)
+response = conn.getresponse()
+print response.status, response.reason
+data =  response.read()
+conn.close()
+print data</source>
+
+<p>Example python job submission parameter generation</p>
+<source>
+import base64
+import hmac
+import sha
+import httplib, urllib
+import sys
+import time
+import os
+from popen2 import popen3
+
+def hmac_sha1(data, key):
+    return urllib.quote(base64.encodestring(hmac.new(key, data, sha).digest()).strip())
+
+stdout, stdin, stderr = popen3("id -un")
+USER = stdout.read().strip()
+f = open(os.path.expanduser("~/.ssh/hadoop_key"))
+KEY = f.read().strip()
+f.close()
+if len(sys.argv) > 1:
+  params = sys.argv[1]
+else:
+  params = ""
+timestamp = long(time.time()*1000)
+params = params + "&amp;user=%s&amp;timestamp=%d" % (USER,timestamp)
+print "-Dmapred.job.timestamp=%d -Dmapred.job.signature=%s -Dmapreduce.job.queue.name=%s" % (timestamp, hmac_sha1(params, KEY), USER)</source>
+    </section>
+    </section>
+    <section>
+      <title>Bibliography</title>
+      <p><code>T. Sandholm and K. Lai</code>. <strong>Mapreduce optimization using regulated dynamic prioritization</strong>. 
+       In SIGMETRICS '09: Proceedings of the eleventh international joint conference on Measurement and modeling of computer systems, pages 299-310, New York, NY, USA, 2009.</p>
+      <p><code>T. Sandholm, K. Lai</code>. <strong>Dynamic Proportional Share Scheduling in Hadoop</strong>. 
+      In JSSPP '10: Proceedings of the 15th Workshop on Job Scheduling Strategies for Parallel Processing, Atlanta, April 2010.</p>
+      <p><code>A. Lenk, J. Nimis, T. Sandholm, S. Tai</code>. <strong>An Open Framework to Support the Development of Commercial Cloud Offerings based on Pre-Existing Applications</strong>. 
+      In CCV '10: Proceedings of the International Conference on Cloud Computing and Virtualization, Singapore, May 2010.</p>
+    </section>
+
+  </body>
+  
+</document>
diff --git a/src/docs/src/documentation/content/xdocs/site.xml b/src/docs/src/documentation/content/xdocs/site.xml
index 013f87e..c1f5e77 100644
--- a/src/docs/src/documentation/content/xdocs/site.xml
+++ b/src/docs/src/documentation/content/xdocs/site.xml
@@ -49,6 +49,7 @@
     <docs label="Schedulers">
         <cap_scheduler 		label="Capacity Scheduler"     href="capacity_scheduler.html"/>
 		<fair_scheduler 			label="Fair Scheduler"            href="fair_scheduler.html"/>
+		<dyn_scheduler 			label="Dynamic Priority Scheduler"            href="dynamic_scheduler.html"/>
     </docs>
    
    <docs label="Miscellaneous">