blob: 8a05662c96c96adeb36b928faac2b2195830b1c8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.schedule;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
/**
* Assignment of one Toplogy, stored in /ZK-DIR/assignments/{topologyid} nodeHost {supervisorid: hostname} -- assigned supervisor Map taskStartTimeSecs:
* {taskid, taskStartSeconds} masterCodeDir: topology source code's dir in Nimbus taskToResource: {taskid, ResourceAssignment}
*
* @author Lixin/Longda
*/
public class Assignment implements Serializable {
public enum AssignmentType {
Assign, UpdateTopology, ScaleTopology
}
private static final long serialVersionUID = 6087667851333314069L;
private final String masterCodeDir;
/**
* @@@ nodeHost store <supervisorId, hostname>, this will waste some zk storage
*/
private final Map<String, String> nodeHost;
private final Map<Integer, Integer> taskStartTimeSecs;
private final Set<ResourceWorkerSlot> workers;
private long timeStamp;
private AssignmentType type;
public Assignment() {
masterCodeDir = null;
this.nodeHost = new HashMap<String, String>();
this.taskStartTimeSecs = new HashMap<Integer, Integer>();
this.workers = new HashSet<ResourceWorkerSlot>();
this.timeStamp = System.currentTimeMillis();
this.type = AssignmentType.Assign;
}
public Assignment(String masterCodeDir, Set<ResourceWorkerSlot> workers, Map<String, String> nodeHost, Map<Integer, Integer> taskStartTimeSecs) {
this.workers = workers;
this.nodeHost = nodeHost;
this.taskStartTimeSecs = taskStartTimeSecs;
this.masterCodeDir = masterCodeDir;
this.timeStamp = System.currentTimeMillis();
this.type = AssignmentType.Assign;
}
public void setAssignmentType(AssignmentType type) {
this.type = type;
}
public AssignmentType getAssignmentType() {
return type;
}
public Map<String, String> getNodeHost() {
return nodeHost;
}
public Map<Integer, Integer> getTaskStartTimeSecs() {
return taskStartTimeSecs;
}
public String getMasterCodeDir() {
return masterCodeDir;
}
public Set<ResourceWorkerSlot> getWorkers() {
return workers;
}
/**
* find workers for every supervisorId (node)
*
* @param supervisorId
* @return Map<Integer, WorkerSlot>
*/
public Map<Integer, ResourceWorkerSlot> getTaskToNodePortbyNode(String supervisorId) {
Map<Integer, ResourceWorkerSlot> result = new HashMap<Integer, ResourceWorkerSlot>();
for (ResourceWorkerSlot worker : workers) {
if (worker.getNodeId().equals(supervisorId)) {
result.put(worker.getPort(), worker);
}
}
return result;
}
public Set<Integer> getCurrentSuperviosrTasks(String supervisorId) {
Set<Integer> Tasks = new HashSet<Integer>();
for (ResourceWorkerSlot worker : workers) {
if (worker.getNodeId().equals(supervisorId))
Tasks.addAll(worker.getTasks());
}
return Tasks;
}
public Set<Integer> getCurrentSuperviosrWorkers(String supervisorId) {
Set<Integer> workerSet = new HashSet<Integer>();
for (ResourceWorkerSlot worker : workers) {
if (worker.getNodeId().equals(supervisorId))
workerSet.add(worker.getPort());
}
return workerSet;
}
public Set<Integer> getCurrentWorkerTasks(String supervisorId, int port) {
for (ResourceWorkerSlot worker : workers) {
if (worker.getNodeId().equals(supervisorId) && worker.getPort() == port)
return worker.getTasks();
}
return new HashSet<Integer>();
}
public ResourceWorkerSlot getWorkerByTaskId(Integer taskId) {
for (ResourceWorkerSlot worker : workers) {
if (worker.getTasks().contains(taskId))
return worker;
}
return null;
}
public long getTimeStamp() {
return this.timeStamp;
}
public boolean isTopologyChange(long oldTimeStamp) {
boolean isChange = false;
if (timeStamp > oldTimeStamp && (type.equals(AssignmentType.UpdateTopology) || type.equals(AssignmentType.ScaleTopology)))
isChange = true;
return isChange;
}
public void updateTimeStamp() {
timeStamp = System.currentTimeMillis();
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((masterCodeDir == null) ? 0 : masterCodeDir.hashCode());
result = prime * result + ((nodeHost == null) ? 0 : nodeHost.hashCode());
result = prime * result + ((taskStartTimeSecs == null) ? 0 : taskStartTimeSecs.hashCode());
result = prime * result + ((workers == null) ? 0 : workers.hashCode());
result = prime * result + (int) (timeStamp & 0xFFFFFFFF);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Assignment other = (Assignment) obj;
if (masterCodeDir == null) {
if (other.masterCodeDir != null)
return false;
} else if (!masterCodeDir.equals(other.masterCodeDir))
return false;
if (nodeHost == null) {
if (other.nodeHost != null)
return false;
} else if (!nodeHost.equals(other.nodeHost))
return false;
if (taskStartTimeSecs == null) {
if (other.taskStartTimeSecs != null)
return false;
} else if (!taskStartTimeSecs.equals(other.taskStartTimeSecs))
return false;
if (workers == null) {
if (other.workers != null)
return false;
} else if (!workers.equals(other.workers))
return false;
if (timeStamp != other.timeStamp)
return false;
return true;
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}