| # Copyright 2008 The Apache Software Foundation Licensed under the |
| # Apache License, Version 2.0 (the "License"); you may not use this |
| # file except in compliance with the License. You may obtain a copy |
| # of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless |
| # required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. See the License for the specific language governing |
| # permissions and limitations under the License. |
| |
| This package implements dynamic priority scheduling for MapReduce jobs. |
| |
| Overview |
| -------- |
| The purpose of this scheduler is to allow users to increase and decrease |
| their queue priorities continuosly to meet the requirements of their |
| current workloads. The scheduler is aware of the current demand and makes |
| it more expensive to boost the priority under peak usage times. Thus |
| users who move their workload to low usage times are rewarded with |
| discounts. Priorities can only be boosted within a limited quota. |
| All users are given a quota or a budget which is deducted periodically |
| in configurable accounting intervals. How much of the budget is |
| deducted is determined by a per-user spending rate, which may |
| be modified at any time directly by the user. The cluster slots |
| share allocated to a particular user is computed as that users |
| spending rate over the sum of all spending rates in the same accounting |
| period. |
| |
| Configuration |
| ------------- |
| This scheduler comprises two components, an accounting or resource allocation part that |
| manages and bills for queue shares, and a scheduler that |
| enforces the queue shares in the form of map and reduce slots of running jobs. |
| |
| Hadoop Configuration (e.g. hadoop-site.xml): |
| mapreduce.jobtracker.taskscheduler |
| This needs to be set to |
| org.apache.hadoop.mapred.DynamicPriorityScheduler |
| to use the dynamic scheduler. |
| Scheduler Configuration: |
| mapred.dynamic-scheduler.scheduler |
| The Java path of the MapReduce scheduler that should |
| enforce the allocated shares. |
| Has been tested with (which is the default): |
| org.apache.hadoop.mapred.PriorityScheduler |
| mapred.priority-scheduler.acl-file |
| Full path of ACL with syntax: |
| <user> <role> <secret key> |
| separated by line feeds |
| mapred.dynamic-scheduler.budget-file |
| The full OS path of the file from which the |
| budgets are read and stored. The syntax of this file is: |
| <queue name> <budget> <spending rate> |
| separated by newlines where budget can be specified |
| as a Java float. The file should not be edited |
| directly, if the server is running, but through the |
| servlet API to ensure proper synchronization. |
| |
| mapred.dynamic-scheduler.alloc-interval |
| Allocation interval, when the scheduler rereads the |
| spending rates and recalculates the cluster shares. |
| Specified as seconds between allocations. |
| Default is 20 seconds. |
| |
| Servlet API |
| ---------- |
| The queue should be managed through the Servlet REST API |
| if the jobtracker server is running. |
| |
| It is installed at |
| [job tracker URL]/scheduler |
| operations supported: |
| price i |
| get the current price of the cluster |
| (aggregate spending rates of queues with running or pending jobs) |
| time |
| get start time of server and current time in epoch units |
| info=queue_to_query |
| get info about queue (requires user or admin privilege> |
| infos |
| get info about all queues (requires admin privilege) |
| addBudget=budget_to_add,queue=queue_to_change |
| add budget to queue (requires admin privilege) |
| setSpending=spending_to_set,queue=queue_to_change |
| set spending rate of queue (requires user or admin privilege) |
| addQueue=queue_to_add |
| add new queue (requires admin privilege) |
| removeQueue=queue_to_remove |
| remove queue (requires admin privilege) |
| |
| Example: |
| http://myhost:50030/scheduler?setSpending=0.01&queue=myqueue |
| The Authorization header is used for signing |
| |
| The signature is created akin to the AWS Query Authentication scheme |
| HMAC_SHA1("<query path>&user=<user>×tamp=<timestamp>", key) |
| For the servlet operations query path is everything that comes after /scheduler? |
| in the url. For job submission the query path is just the empty string "". |
| Job submissions also need to set the following job properties: |
| -Dmapred.job.timestamp=<ms epoch time> |
| -Dmapred.job.signature=<signature as above> -Dmapreduce.job.queue.name=<queue> |
| Note queue must match the user submitting the job. |
| |
| Example python query |
| -------------------------------- |
| import base64 |
| import hmac |
| import sha |
| import httplib, urllib |
| import sys |
| import time |
| from popen2 import popen3 |
| import os |
| |
| def hmac_sha1(data, key): |
| return urllib.quote(base64.encodestring(hmac.new(key, data, sha).digest()).strip()) |
| |
| stdout, stdin, stderr = popen3("id -un") |
| USER = stdout.read().strip() |
| f = open(os.path.expanduser("~/.ssh/hadoop_key")) |
| KEY = f.read().strip() |
| f.close() |
| f = open(os.path.expanduser("/etc/hadoop_server")) |
| SERVER = f.read().strip() |
| f.close() |
| URL = "/scheduler" |
| conn = httplib.HTTPConnection(SERVER) |
| params = sys.argv[1] |
| params = params + "&user=%s×tamp=%d" % (USER,long(time.time()*1000)) |
| print params |
| headers = {"Authorization": hmac_sha1(params, KEY)} |
| print headers |
| conn.request("GET",URL + "?" + params,None, headers) |
| response = conn.getresponse() |
| print response.status, response.reason |
| data = response.read() |
| conn.close() |
| print data |
| |
| Example python job submission parameter generation |
| -------------------------------------------------- |
| import base64 |
| import hmac |
| import sha |
| import httplib, urllib |
| import sys |
| import time |
| import os |
| from popen2 import popen3 |
| |
| def hmac_sha1(data, key): |
| return urllib.quote(base64.encodestring(hmac.new(key, data, sha).digest()).strip()) |
| |
| stdout, stdin, stderr = popen3("id -un") |
| USER = stdout.read().strip() |
| f = open(os.path.expanduser("~/.ssh/hadoop_key")) |
| KEY = f.read().strip() |
| f.close() |
| if len(sys.argv) > 1: |
| params = sys.argv[1] |
| else: |
| params = "" |
| timestamp = long(time.time()*1000) |
| params = params + "&user=%s×tamp=%d" % (USER,timestamp) |
| print "-Dmapred.job.timestamp=%d -Dmapred.job.signature=%s -Dmapreduce.job.queuename=%s" % (timestamp, hmac_sha1(params, KEY), USER) |
| |
| |