blob: a3f85dd6dddbc879b66220559d45cb69e634fd96 [file] [log] [blame]
# Copyright 2008 The Apache Software Foundation Licensed under the
# Apache License, Version 2.0 (the "License"); you may not use this
# file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
# required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
This package implements dynamic priority scheduling for MapReduce jobs.
Overview
--------
The purpose of this scheduler is to allow users to increase and decrease
their queue priorities continuosly to meet the requirements of their
current workloads. The scheduler is aware of the current demand and makes
it more expensive to boost the priority under peak usage times. Thus
users who move their workload to low usage times are rewarded with
discounts. Priorities can only be boosted within a limited quota.
All users are given a quota or a budget which is deducted periodically
in configurable accounting intervals. How much of the budget is
deducted is determined by a per-user spending rate, which may
be modified at any time directly by the user. The cluster slots
share allocated to a particular user is computed as that users
spending rate over the sum of all spending rates in the same accounting
period.
Configuration
-------------
This scheduler comprises two components, an accounting or resource allocation part that
manages and bills for queue shares, and a scheduler that
enforces the queue shares in the form of map and reduce slots of running jobs.
Hadoop Configuration (e.g. hadoop-site.xml):
mapreduce.jobtracker.taskscheduler
This needs to be set to
org.apache.hadoop.mapred.DynamicPriorityScheduler
to use the dynamic scheduler.
Scheduler Configuration:
mapred.dynamic-scheduler.scheduler
The Java path of the MapReduce scheduler that should
enforce the allocated shares.
Has been tested with (which is the default):
org.apache.hadoop.mapred.PriorityScheduler
mapred.priority-scheduler.acl-file
Full path of ACL with syntax:
<user> <role> <secret key>
separated by line feeds
mapred.dynamic-scheduler.budget-file
The full OS path of the file from which the
budgets are read and stored. The syntax of this file is:
<queue name> <budget> <spending rate>
separated by newlines where budget can be specified
as a Java float. The file should not be edited
directly, if the server is running, but through the
servlet API to ensure proper synchronization.
mapred.dynamic-scheduler.alloc-interval
Allocation interval, when the scheduler rereads the
spending rates and recalculates the cluster shares.
Specified as seconds between allocations.
Default is 20 seconds.
Servlet API
----------
The queue should be managed through the Servlet REST API
if the jobtracker server is running.
It is installed at
[job tracker URL]/scheduler
operations supported:
price i
get the current price of the cluster
(aggregate spending rates of queues with running or pending jobs)
time
get start time of server and current time in epoch units
info=queue_to_query
get info about queue (requires user or admin privilege>
infos
get info about all queues (requires admin privilege)
addBudget=budget_to_add,queue=queue_to_change
add budget to queue (requires admin privilege)
setSpending=spending_to_set,queue=queue_to_change
set spending rate of queue (requires user or admin privilege)
addQueue=queue_to_add
add new queue (requires admin privilege)
removeQueue=queue_to_remove
remove queue (requires admin privilege)
Example:
http://myhost:50030/scheduler?setSpending=0.01&queue=myqueue
The Authorization header is used for signing
The signature is created akin to the AWS Query Authentication scheme
HMAC_SHA1("<query path>&user=<user>&timestamp=<timestamp>", key)
For the servlet operations query path is everything that comes after /scheduler?
in the url. For job submission the query path is just the empty string "".
Job submissions also need to set the following job properties:
-Dmapred.job.timestamp=<ms epoch time>
-Dmapred.job.signature=<signature as above> -Dmapreduce.job.queue.name=<queue>
Note queue must match the user submitting the job.
Example python query
--------------------------------
import base64
import hmac
import sha
import httplib, urllib
import sys
import time
from popen2 import popen3
import os
def hmac_sha1(data, key):
return urllib.quote(base64.encodestring(hmac.new(key, data, sha).digest()).strip())
stdout, stdin, stderr = popen3("id -un")
USER = stdout.read().strip()
f = open(os.path.expanduser("~/.ssh/hadoop_key"))
KEY = f.read().strip()
f.close()
f = open(os.path.expanduser("/etc/hadoop_server"))
SERVER = f.read().strip()
f.close()
URL = "/scheduler"
conn = httplib.HTTPConnection(SERVER)
params = sys.argv[1]
params = params + "&user=%s&timestamp=%d" % (USER,long(time.time()*1000))
print params
headers = {"Authorization": hmac_sha1(params, KEY)}
print headers
conn.request("GET",URL + "?" + params,None, headers)
response = conn.getresponse()
print response.status, response.reason
data = response.read()
conn.close()
print data
Example python job submission parameter generation
--------------------------------------------------
import base64
import hmac
import sha
import httplib, urllib
import sys
import time
import os
from popen2 import popen3
def hmac_sha1(data, key):
return urllib.quote(base64.encodestring(hmac.new(key, data, sha).digest()).strip())
stdout, stdin, stderr = popen3("id -un")
USER = stdout.read().strip()
f = open(os.path.expanduser("~/.ssh/hadoop_key"))
KEY = f.read().strip()
f.close()
if len(sys.argv) > 1:
params = sys.argv[1]
else:
params = ""
timestamp = long(time.time()*1000)
params = params + "&user=%s&timestamp=%d" % (USER,timestamp)
print "-Dmapred.job.timestamp=%d -Dmapred.job.signature=%s -Dmapreduce.job.queuename=%s" % (timestamp, hmac_sha1(params, KEY), USER)