blob: cff6f08f705fe25370b81de306377c918344030c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.elasticjob.cloud.scheduler.mesos;
import org.apache.shardingsphere.elasticjob.cloud.context.ExecutionType;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.context.JobContext;
import org.apache.shardingsphere.elasticjob.cloud.context.TaskContext;
import com.netflix.fenzo.TaskAssignmentResult;
import com.netflix.fenzo.TaskRequest;
import com.netflix.fenzo.VMAssignmentResult;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
/**
* Launching tasks.
*/
@Slf4j
public final class LaunchingTasks {
private final Map<String, JobContext> eligibleJobContextsMap;
public LaunchingTasks(final Collection<JobContext> eligibleJobContexts) {
eligibleJobContextsMap = new HashMap<>(eligibleJobContexts.size(), 1);
for (JobContext each : eligibleJobContexts) {
eligibleJobContextsMap.put(each.getJobConfig().getJobName(), each);
}
}
List<TaskRequest> getPendingTasks() {
List<TaskRequest> result = new ArrayList<>(eligibleJobContextsMap.size() * 10);
for (JobContext each : eligibleJobContextsMap.values()) {
result.addAll(createTaskRequests(each));
}
return result;
}
private Collection<TaskRequest> createTaskRequests(final JobContext jobContext) {
Collection<TaskRequest> result = new ArrayList<>(jobContext.getAssignedShardingItems().size());
for (int each : jobContext.getAssignedShardingItems()) {
result.add(new JobTaskRequest(new TaskContext(jobContext.getJobConfig().getJobName(), Collections.singletonList(each), jobContext.getType()), jobContext.getJobConfig()));
}
return result;
}
Collection<String> getIntegrityViolationJobs(final Collection<VMAssignmentResult> vmAssignmentResults) {
Map<String, Integer> assignedJobShardingTotalCountMap = getAssignedJobShardingTotalCountMap(vmAssignmentResults);
Collection<String> result = new HashSet<>(assignedJobShardingTotalCountMap.size(), 1);
for (Map.Entry<String, Integer> entry : assignedJobShardingTotalCountMap.entrySet()) {
JobContext jobContext = eligibleJobContextsMap.get(entry.getKey());
if (ExecutionType.FAILOVER != jobContext.getType() && !entry.getValue().equals(jobContext.getJobConfig().getTypeConfig().getCoreConfig().getShardingTotalCount())) {
log.warn("Job {} is not assigned at this time, because resources not enough to run all sharding instances.", entry.getKey());
result.add(entry.getKey());
}
}
return result;
}
private Map<String, Integer> getAssignedJobShardingTotalCountMap(final Collection<VMAssignmentResult> vmAssignmentResults) {
Map<String, Integer> result = new HashMap<>(eligibleJobContextsMap.size(), 1);
for (VMAssignmentResult vmAssignmentResult: vmAssignmentResults) {
for (TaskAssignmentResult tasksAssigned: vmAssignmentResult.getTasksAssigned()) {
String jobName = TaskContext.from(tasksAssigned.getTaskId()).getMetaInfo().getJobName();
if (result.containsKey(jobName)) {
result.put(jobName, result.get(jobName) + 1);
} else {
result.put(jobName, 1);
}
}
}
return result;
}
}