blob: e00f3a061e1591f92df68192bd1b054879567094 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.resourceestimator.skylinestore.impl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
import org.apache.hadoop.resourceestimator.skylinestore.api.SkylineStore;
import org.apache.hadoop.resourceestimator.skylinestore.exceptions.DuplicateRecurrenceIdException;
import org.apache.hadoop.resourceestimator.skylinestore.exceptions.EmptyResourceSkylineException;
import org.apache.hadoop.resourceestimator.skylinestore.exceptions.RecurrenceIdNotFoundException;
import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException;
import org.apache.hadoop.resourceestimator.skylinestore.validator.SkylineStoreValidator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An in-memory implementation of {@link SkylineStore}.
*/
public class InMemoryStore implements SkylineStore {
private static final Logger LOGGER =
LoggerFactory.getLogger(InMemoryStore.class);
private final ReentrantReadWriteLock readWriteLock =
new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
private final SkylineStoreValidator inputValidator =
new SkylineStoreValidator();
/**
* A pipeline job's history {@link ResourceSkyline}s. TODO: we may flatten it
* out for quick access.
*/
private final Map<RecurrenceId, List<ResourceSkyline>> skylineStore =
new HashMap<>(); // pipelineId, resource skyline
// Recurring pipeline's predicted {@link ResourceSkyline}s.
private final Map<String, RLESparseResourceAllocation> estimationStore =
new HashMap<>(); // pipelineId, ResourceSkyline
private List<ResourceSkyline> eliminateNull(
final List<ResourceSkyline> resourceSkylines) {
final List<ResourceSkyline> result = new ArrayList<>();
for (final ResourceSkyline resourceSkyline : resourceSkylines) {
if (resourceSkyline != null) {
result.add(resourceSkyline);
}
}
return result;
}
@Override public final void addHistory(final RecurrenceId recurrenceId,
final List<ResourceSkyline> resourceSkylines)
throws SkylineStoreException {
inputValidator.validate(recurrenceId, resourceSkylines);
writeLock.lock();
try {
// remove the null elements in the resourceSkylines
final List<ResourceSkyline> filteredInput =
eliminateNull(resourceSkylines);
if (filteredInput.size() > 0) {
if (skylineStore.containsKey(recurrenceId)) {
// if filteredInput has duplicate jobIds with existing skylines in the
// store,
// throw out an exception
final List<ResourceSkyline> jobHistory =
skylineStore.get(recurrenceId);
final List<String> oldJobIds = new ArrayList<>();
for (final ResourceSkyline resourceSkyline : jobHistory) {
oldJobIds.add(resourceSkyline.getJobId());
}
if (!oldJobIds.isEmpty()) {
for (ResourceSkyline elem : filteredInput) {
if (oldJobIds.contains(elem.getJobId())) {
StringBuilder errMsg = new StringBuilder();
errMsg.append(
"Trying to addHistory duplicate resource skylines for "
+ recurrenceId
+ ". Use updateHistory function instead.");
LOGGER.error(errMsg.toString());
throw new DuplicateRecurrenceIdException(errMsg.toString());
}
}
}
skylineStore.get(recurrenceId).addAll(filteredInput);
LOGGER.info("Successfully addHistory new resource skylines for {}.",
recurrenceId);
} else {
skylineStore.put(recurrenceId, filteredInput);
LOGGER.info("Successfully addHistory new resource skylines for {}.",
recurrenceId);
}
}
} finally {
writeLock.unlock();
}
}
@Override public void addEstimation(String pipelineId,
RLESparseResourceAllocation resourceSkyline)
throws SkylineStoreException {
inputValidator.validate(pipelineId, resourceSkyline);
writeLock.lock();
try {
estimationStore.put(pipelineId, resourceSkyline);
LOGGER.info("Successfully add estimated resource allocation for {}.",
pipelineId);
} finally {
writeLock.unlock();
}
}
@Override public final void deleteHistory(final RecurrenceId recurrenceId)
throws SkylineStoreException {
inputValidator.validate(recurrenceId);
writeLock.lock();
try {
if (skylineStore.containsKey(recurrenceId)) {
skylineStore.remove(recurrenceId);
LOGGER.warn("Delete resource skylines for {}.", recurrenceId);
} else {
StringBuilder errMsg = new StringBuilder();
errMsg.append(
"Trying to deleteHistory non-existing recurring pipeline "
+ recurrenceId + "\'s resource skylines");
LOGGER.error(errMsg.toString());
throw new RecurrenceIdNotFoundException(errMsg.toString());
}
} finally {
writeLock.unlock();
}
}
@Override public final void updateHistory(final RecurrenceId recurrenceId,
final List<ResourceSkyline> resourceSkylines)
throws SkylineStoreException {
inputValidator.validate(recurrenceId, resourceSkylines);
writeLock.lock();
try {
if (skylineStore.containsKey(recurrenceId)) {
// remove the null elements in the resourceSkylines
List<ResourceSkyline> filteredInput = eliminateNull(resourceSkylines);
if (filteredInput.size() > 0) {
skylineStore.put(recurrenceId, filteredInput);
LOGGER.info("Successfully updateHistory resource skylines for {}.",
recurrenceId);
} else {
StringBuilder errMsg = new StringBuilder();
errMsg.append("Trying to updateHistory " + recurrenceId
+ " with empty resource skyline");
LOGGER.error(errMsg.toString());
throw new EmptyResourceSkylineException(errMsg.toString());
}
} else {
StringBuilder errMsg = new StringBuilder();
errMsg.append(
"Trying to updateHistory non-existing resource skylines for "
+ recurrenceId);
LOGGER.error(errMsg.toString());
throw new RecurrenceIdNotFoundException(errMsg.toString());
}
} finally {
writeLock.unlock();
}
}
@Override public final Map<RecurrenceId, List<ResourceSkyline>> getHistory(
final RecurrenceId recurrenceId) throws SkylineStoreException {
inputValidator.validate(recurrenceId);
readLock.lock();
try {
String pipelineId = recurrenceId.getPipelineId();
// User tries to getHistory all resource skylines in the skylineStore
if (pipelineId.equals("*")) {
LOGGER
.info("Successfully query resource skylines for {}.", recurrenceId);
return Collections.unmodifiableMap(skylineStore);
}
String runId = recurrenceId.getRunId();
Map<RecurrenceId, List<ResourceSkyline>> result =
new HashMap<RecurrenceId, List<ResourceSkyline>>();
// User tries to getHistory pipelineId's all resource skylines in the
// skylineStore
if (runId.equals("*")) {
// TODO: this for loop is expensive, so we may change the type of
// skylineStore to
// speed up this loop.
for (Map.Entry<RecurrenceId, List<ResourceSkyline>> entry : skylineStore
.entrySet()) {
RecurrenceId index = entry.getKey();
if (index.getPipelineId().equals(pipelineId)) {
result.put(index, entry.getValue());
}
}
if (result.size() > 0) {
LOGGER.info("Successfully query resource skylines for {}.",
recurrenceId);
return Collections.unmodifiableMap(result);
} else {
LOGGER.warn(
"Trying to getHistory non-existing resource skylines for {}.",
recurrenceId);
return null;
}
}
// User tries to getHistory {pipelineId, runId}'s resource skylines
if (skylineStore.containsKey(recurrenceId)) {
result.put(recurrenceId, skylineStore.get(recurrenceId));
} else {
LOGGER
.warn("Trying to getHistory non-existing resource skylines for {}.",
recurrenceId);
return null;
}
LOGGER.info("Successfully query resource skylines for {}.", recurrenceId);
return Collections.unmodifiableMap(result);
} finally {
readLock.unlock();
}
}
@Override public final RLESparseResourceAllocation getEstimation(
String pipelineId) throws SkylineStoreException {
inputValidator.validate(pipelineId);
readLock.lock();
try {
return estimationStore.get(pipelineId);
} finally {
readLock.unlock();
}
}
}