blob: 36b4d271804d57a9588e32f9b7902092e6b0d20e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.service.modules.orchestration;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.typesafe.config.Config;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.service.RequesterService;
import org.apache.gobblin.service.ServiceRequester;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;
/**
* Manages the statically configured user quotas for both the proxy user in user.to.proxy configuration and the API requester(s)
* Is used by the dag manager to ensure that the number of currently running jobs do not exceed the quota, if the quota
* is exceeded, then the execution will fail without running on the underlying executor
*/
@Slf4j
@Singleton
public class UserQuotaManager {
public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perUserQuota";
public static final String PER_FLOWGROUP_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
public static final String USER_JOB_QUOTA_KEY = DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
public static final String QUOTA_SEPERATOR = ":";
public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
private final Map<String, Integer> proxyUserToJobCount = new ConcurrentHashMap<>();
private final Map<String, Integer> flowGroupToJobCount = new ConcurrentHashMap<>();
private final Map<String, Integer> requesterToJobCount = new ConcurrentHashMap<>();
private final Map<String, Integer> perUserQuota;
private final Map<String, Integer> perFlowGroupQuota;
Map<String, Boolean> runningDagIds = new ConcurrentHashMap<>();
private final int defaultQuota;
@Inject
public UserQuotaManager(Config config) {
this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA);
ImmutableMap.Builder<String, Integer> userMapBuilder = ImmutableMap.builder();
ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = ImmutableMap.builder();
// Quotas will take form of user:<Quota> and flowGroup:<Quota>
for (String flowGroupQuota : ConfigUtils.getStringList(config, PER_FLOWGROUP_QUOTA)) {
flowGroupMapBuilder.put(flowGroupQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(flowGroupQuota.split(QUOTA_SEPERATOR)[1]));
}
// Keep quotas per user as well in form user:<Quota> which apply for all flowgroups
for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) {
userMapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
}
this.perUserQuota = userMapBuilder.build();
this.perFlowGroupQuota = flowGroupMapBuilder.build();
}
/**
* Checks if the dagNode exceeds the statically configured user quota for both the proxy user, requester user, and flowGroup
* @throws QuotaExceededException if the quota is exceeded, and logs a statement
*/
public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean onInit) throws QuotaExceededException {
// Dag is already being tracked, no need to double increment for retries and multihop flows
if (isDagCurrentlyRunning(dagNode)) {
return;
}
String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
ConfigurationKeys.FLOW_GROUP_KEY, "");
String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
boolean proxyUserCheck = true;
Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for which quota is increased
StringBuilder requesterMessage = new StringBuilder();
runningDagIds.put(DagManagerUtils.generateDagId(dagNode), true);
if (proxyUser != null) {
int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), proxyUserToJobCount, dagNode, getQuotaForUser(proxyUser));
proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check succeeds
if (!proxyUserCheck) {
// add 1 to proxyUserIncrement since count starts at 0, and is negative if quota is exceeded
requesterMessage.append(String.format(
"Quota exceeded for proxy user %s on executor %s : quota=%s, requests above quota=%d%n",
proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser)));
}
}
String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
boolean requesterCheck = true;
if (serializedRequesters != null) {
List<String> uniqueRequesters = DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
for (String requester : uniqueRequesters) {
int userQuotaIncrement = incrementJobCountAndCheckQuota(
DagManagerUtils.getUserQuotaKey(requester, dagNode), requesterToJobCount, dagNode, getQuotaForUser(requester));
boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota check succeeds
usersQuotaIncrement.add(requester);
requesterCheck = requesterCheck && thisRequesterCheck;
if (!thisRequesterCheck) {
requesterMessage.append(String.format(
"Quota exceeded for requester %s on executor %s : quota=%s, requests above quota=%d%n",
requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement)-getQuotaForUser(requester)));
}
}
}
int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), flowGroupToJobCount, dagNode, getQuotaForFlowGroup(flowGroup));
boolean flowGroupCheck = flowGroupQuotaIncrement >= 0;
if (!flowGroupCheck) {
requesterMessage.append(String.format(
"Quota exceeded for flowgroup %s on executor %s : quota=%s, requests above quota=%d%n",
flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup), Math.abs(flowGroupQuotaIncrement)+1-getQuotaForFlowGroup(flowGroup)));
}
// Throw errors for reach quota at the end to avoid inconsistent job counts
if ((!proxyUserCheck || !requesterCheck || !flowGroupCheck) && !onInit) {
// roll back the increased counts in this block
decrementQuotaUsage(proxyUserToJobCount, DagManagerUtils.getUserQuotaKey(proxyUser, dagNode));
decrementQuotaUsage(flowGroupToJobCount, DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode));
decrementQuotaUsageForUsers(usersQuotaIncrement);
runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
throw new QuotaExceededException(requesterMessage.toString());
}
}
/**
* Increment quota by one for the given map and key.
* @return a negative number if quota is already reached
* a positive number if the quota is not reached
* the absolute value of the number is the used quota before this increment request
* 0 if quota usage is not changed
*/
private int incrementJobCountAndCheckQuota(String key, Map<String, Integer> quotaMap, Dag.DagNode<JobExecutionPlan> dagNode, int quotaForKey) {
// Only increment job count for first attempt, since job is considered running between retries
// Include the scenario where currentAttempts is 0 (when checked by the scheduler)
// but it will not double increment due to first ensuring that the dag is not already incremented
if (dagNode.getValue().getCurrentAttempts() > 1) {
return 0;
}
Integer currentCount;
// Modifications must be thread safe since DAGs on DagManagerThreads may update the quota for the same user
do {
currentCount = quotaMap.get(key);
} while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null : !quotaMap.replace(key, currentCount, currentCount + 1));
if (currentCount == null) {
currentCount = 0;
}
if (currentCount >= quotaForKey) {
return -currentCount; // increment must have crossed the quota
} else {
return currentCount;
}
}
/**
* Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}.
* Returns true if the dag existed in the set of running dags and was removed successfully
*/
public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) {
Boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
if (val == null) {
return false;
}
String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
if (proxyUser != null) {
String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
decrementQuotaUsage(proxyUserToJobCount, proxyUserKey);
}
String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
ConfigurationKeys.FLOW_GROUP_KEY, "");
decrementQuotaUsage(flowGroupToJobCount, DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode));
String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
if (serializedRequesters != null) {
try {
for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
String requesterKey = DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
decrementQuotaUsage(requesterToJobCount, requesterKey);
}
} catch (IOException e) {
log.error("Failed to release quota for requester list " + serializedRequesters, e);
return false;
}
}
return true;
}
private void decrementQuotaUsage(Map<String, Integer> quotaMap, String key) {
Integer currentCount;
if (key == null) {
return;
}
do {
currentCount = quotaMap.get(key);
} while (currentCount != null && currentCount > 0 && !quotaMap.replace(key, currentCount, currentCount - 1));
}
private void decrementQuotaUsageForUsers(Set<String> requestersToDecreaseCount) {
for (String requester : requestersToDecreaseCount) {
decrementQuotaUsage(requesterToJobCount, requester);
}
}
private int getQuotaForUser(String user) {
return this.perUserQuota.getOrDefault(user, defaultQuota);
}
private int getQuotaForFlowGroup(String flowGroup) {
return this.perFlowGroupQuota.getOrDefault(flowGroup, defaultQuota);
}
private boolean isDagCurrentlyRunning(Dag.DagNode<JobExecutionPlan> dagNode) {
return this.runningDagIds.containsKey(DagManagerUtils.generateDagId(dagNode));
}
}