blob: ee41630faf2e6485463f5e0ae35273a35af7b97d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.scheduler;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.storm.generated.WorkerResources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SchedulerAssignmentImpl implements SchedulerAssignment {
private static final Logger LOG = LoggerFactory.getLogger(SchedulerAssignmentImpl.class);
private static Function<WorkerSlot, Collection<ExecutorDetails>> MAKE_LIST = (k) -> new LinkedList<>();
/**
* topology-id this assignment is for.
*/
private final String topologyId;
/**
* assignment detail, a mapping from executor to <code>WorkerSlot</code>.
*/
private final Map<ExecutorDetails, WorkerSlot> executorToSlot = new HashMap<>();
private final Map<WorkerSlot, WorkerResources> resources = new HashMap<>();
private final Map<String, Double> nodeIdToTotalSharedOffHeapNode = new HashMap<>();
private final Map<WorkerSlot, Collection<ExecutorDetails>> slotToExecutors = new HashMap<>();
/**
* Create a new assignment.
*
* @param topologyId the id of the topology the assignment is for.
* @param executorToSlot the executor to slot mapping for the assignment. Can be null and set through other methods later.
* @param resources the resources for the current assignments. Can be null and set through other methods later.
* @param nodeIdToTotalSharedOffHeap the shared memory for this assignment can be null and set through other methods later.
*/
public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlot,
Map<WorkerSlot, WorkerResources> resources, Map<String, Double> nodeIdToTotalSharedOffHeap) {
this.topologyId = topologyId;
if (executorToSlot != null) {
if (executorToSlot.entrySet().stream().anyMatch((entry) -> entry.getKey() == null || entry.getValue() == null)) {
throw new RuntimeException("Cannot create a scheduling with a null in it " + executorToSlot);
}
this.executorToSlot.putAll(executorToSlot);
for (Map.Entry<ExecutorDetails, WorkerSlot> entry : executorToSlot.entrySet()) {
slotToExecutors.computeIfAbsent(entry.getValue(), MAKE_LIST).add(entry.getKey());
}
}
if (resources != null) {
if (resources.entrySet().stream().anyMatch((entry) -> entry.getKey() == null || entry.getValue() == null)) {
throw new RuntimeException("Cannot create resources with a null in it " + resources);
}
this.resources.putAll(resources);
}
if (nodeIdToTotalSharedOffHeap != null) {
if (nodeIdToTotalSharedOffHeap.entrySet().stream().anyMatch((entry) -> entry.getKey() == null || entry.getValue() == null)) {
throw new RuntimeException("Cannot create off heap with a null in it " + nodeIdToTotalSharedOffHeap);
}
this.nodeIdToTotalSharedOffHeapNode.putAll(nodeIdToTotalSharedOffHeap);
}
}
public SchedulerAssignmentImpl(String topologyId) {
this(topologyId, null, null, null);
}
public SchedulerAssignmentImpl(SchedulerAssignment assignment) {
this(assignment.getTopologyId(), assignment.getExecutorToSlot(),
assignment.getScheduledResources(), assignment.getNodeIdToTotalSharedOffHeapNodeMemory());
}
@Override
public String toString() {
return this.getClass().getSimpleName() + " topo: " + topologyId + " execToSlots: " + executorToSlot;
}
/**
* Like the equals command, but ignores the resources.
*
* @param other the object to check for equality against.
* @return true if they are equal, ignoring resources, else false.
*/
public boolean equalsIgnoreResources(Object other) {
if (other == this) {
return true;
}
if (!(other instanceof SchedulerAssignmentImpl)) {
return false;
}
SchedulerAssignmentImpl o = (SchedulerAssignmentImpl) other;
return topologyId.equals(o.topologyId)
&& executorToSlot.equals(o.executorToSlot);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((topologyId == null) ? 0 : topologyId.hashCode());
result = prime * result + ((executorToSlot == null) ? 0 : executorToSlot.hashCode());
return result;
}
@Override
public boolean equals(Object other) {
if (!equalsIgnoreResources(other)) {
return false;
}
SchedulerAssignmentImpl o = (SchedulerAssignmentImpl) other;
return resources.equals(o.resources)
&& nodeIdToTotalSharedOffHeapNode.equals(o.nodeIdToTotalSharedOffHeapNode);
}
@Override
public Set<WorkerSlot> getSlots() {
return new HashSet<>(executorToSlot.values());
}
@Deprecated
public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) {
assign(slot, executors, null);
}
/**
* Assign the slot to executors.
*/
public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors, WorkerResources slotResources) {
assert slot != null;
for (ExecutorDetails executor : executors) {
this.executorToSlot.put(executor, slot);
}
slotToExecutors.computeIfAbsent(slot, MAKE_LIST)
.addAll(executors);
if (slotResources != null) {
resources.put(slot, slotResources);
} else {
resources.remove(slot);
}
}
/**
* Release the slot occupied by this assignment.
*/
public void unassignBySlot(WorkerSlot slot) {
Collection<ExecutorDetails> executors = slotToExecutors.remove(slot);
// remove
if (executors != null) {
for (ExecutorDetails executor : executors) {
executorToSlot.remove(executor);
}
}
resources.remove(slot);
String node = slot.getNodeId();
boolean isFound = false;
for (WorkerSlot ws : executorToSlot.values()) {
if (node.equals(ws.getNodeId())) {
isFound = true;
break;
}
}
if (!isFound) {
nodeIdToTotalSharedOffHeapNode.remove(node);
}
}
@Override
public boolean isSlotOccupied(WorkerSlot slot) {
return this.slotToExecutors.containsKey(slot);
}
@Override
public boolean isExecutorAssigned(ExecutorDetails executor) {
return this.executorToSlot.containsKey(executor);
}
@Override
public String getTopologyId() {
return this.topologyId;
}
@Override
public Map<ExecutorDetails, WorkerSlot> getExecutorToSlot() {
return this.executorToSlot;
}
@Override
public Set<ExecutorDetails> getExecutors() {
return this.executorToSlot.keySet();
}
@Override
public Map<WorkerSlot, Collection<ExecutorDetails>> getSlotToExecutors() {
return slotToExecutors;
}
@Override
public Map<WorkerSlot, WorkerResources> getScheduledResources() {
return resources;
}
public void setTotalSharedOffHeapNodeMemory(String node, double value) {
nodeIdToTotalSharedOffHeapNode.put(node, value);
}
@Override
public Map<String, Double> getNodeIdToTotalSharedOffHeapNodeMemory() {
return nodeIdToTotalSharedOffHeapNode;
}
}