blob: e06abf80b11b61e3ff4bcbb6f7c127e4c310c00c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package backtype.storm.scheduler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
//TODO: improve this by maintaining slot -> executors as well for more efficient operations
public class SchedulerAssignmentImpl implements SchedulerAssignment {
/**
* topology-id this assignment is for.
*/
String topologyId;
/**
* assignment detail, a mapping from executor to <code>WorkerSlot</code>
*/
Map<ExecutorDetails, WorkerSlot> executorToSlot;
public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlots) {
this.topologyId = topologyId;
this.executorToSlot = new HashMap<>(0);
if (executorToSlots != null) {
this.executorToSlot.putAll(executorToSlots);
}
}
@Override
public Set<WorkerSlot> getSlots() {
return new HashSet<>(executorToSlot.values());
}
/**
* Assign the slot to executors.
*/
public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) {
for (ExecutorDetails executor : executors) {
this.executorToSlot.put(executor, slot);
}
}
/**
* Release the slot occupied by this assignment.
*/
public void unassignBySlot(WorkerSlot slot) {
List<ExecutorDetails> executors = new ArrayList<>();
for (ExecutorDetails executor : this.executorToSlot.keySet()) {
WorkerSlot ws = this.executorToSlot.get(executor);
if (ws.equals(slot)) {
executors.add(executor);
}
}
// remove
for (ExecutorDetails executor : executors) {
this.executorToSlot.remove(executor);
}
}
/**
* @param slot
* @return true if slot is occupied by this assignment
*/
public boolean isSlotOccupied(WorkerSlot slot) {
return this.executorToSlot.containsValue(slot);
}
public boolean isExecutorAssigned(ExecutorDetails executor) {
return this.executorToSlot.containsKey(executor);
}
public String getTopologyId() {
return this.topologyId;
}
public Map<ExecutorDetails, WorkerSlot> getExecutorToSlot() {
return this.executorToSlot;
}
/**
* @return the executors covered by this assignments
*/
public Set<ExecutorDetails> getExecutors() {
return this.executorToSlot.keySet();
}
}