blob: 5daa897d4afc1f9de5ffdb3c9bc5442640de7231 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
import java.util.Map;
import java.util.HashMap;
import java.util.Collection;
/**
* Abstract class for implementing a persistent store
* of allocation information.
*/
public abstract class AllocationStore {
Map<String,BudgetQueue> queueCache = new HashMap<String,BudgetQueue>();
/**
* Initializes configuration
* @param conf MapReduce configuration
*/
public abstract void init(Configuration conf);
/**
* Loads allocations from persistent store
*/
public abstract void load();
/**
* Saves allocations to persistent store
*/
public abstract void save();
/**
* Gets current remaining budget associated with queue.
* @param queue name of queue
* @return budget in credits
*/
public float getBudget(String queue) {
float budget = 0.0f;
BudgetQueue budgetQueue = queueCache.get(queue);
if (budgetQueue != null) {
budget = budgetQueue.budget;
}
return budget;
}
/**
* Gets current spending rate associated with queue.
* @param queue name of queue
* @return spending rate in credits per allocation interval to be
* deducted from budget
*/
public float getSpending(String queue) {
float spending = 0;
BudgetQueue budgetQueue = queueCache.get(queue);
if (budgetQueue != null) {
spending = budgetQueue.spending;
}
return spending;
}
/**
* Adds budget to queue.
* @param queue name of queue
* @param budget in credits to be added to queue
*/
public synchronized void addBudget(String queue, float budget) {
BudgetQueue budgetQueue = queueCache.get(queue);
if (budgetQueue == null) {
return;
}
budgetQueue.addBudget(budget);
}
/**
* Adds new queue.
* @param queue name of queue
*/
public synchronized void addQueue(String queue) {
queueCache.put(queue, new BudgetQueue(queue,0.0f,0.0f));
}
/**
* Gets queue info.
* @param queue name of queue
* @return xml representation of queue info as a string
*/
public String getQueueInfo(String queue) {
BudgetQueue budgetQueue = queueCache.get(queue);
if (budgetQueue == null) {
return "";
}
return "<budget>" + Float.toString(budgetQueue.budget) + "</budget>\n" +
"<spending>" + Float.toString(budgetQueue.spending) + "</spending>\n" +
"<used>" + Integer.toString(budgetQueue.used) + "</used>\n" +
"<pending>" + budgetQueue.pending + "</pending>\n";
}
/**
* Remove queue.
* @param queue name of queue
*/
public synchronized void removeQueue(String queue) {
queueCache.remove(queue);
}
/**
* Sets spending rate for queue.
* @param queue name of queue
* @param spending spending rate in credits per allocation interval to be
* deducted from budget
*/
public synchronized void setSpending(String queue, float spending) {
BudgetQueue budgetQueue = queueCache.get(queue);
if (budgetQueue == null) {
return;
}
budgetQueue.spending = spending;
}
/**
* Sets queue usage for accounting
* @param queue name of queue
* @param used slots currently in use
* @param pending pending tasks
*/
public synchronized void setUsage(String queue, int used, int pending) {
BudgetQueue budgetQueue = queueCache.get(queue);
if (budgetQueue == null) {
return;
}
budgetQueue.used = used;
budgetQueue.pending = pending;
}
/**
* Gets queue status (budget, spending, usage)
* @return collection of queue status objects
*/
public Collection<BudgetQueue> getQueues() {
return queueCache.values();
}
}