blob: 3a58f6526b5664c804d8c7ba37032467c01e9c75 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.mesos;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.ignite.mesos.resource.ResourceProvider;
import org.apache.mesos.Protos;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
/**
* Ignite scheduler receives offers from Mesos and decides how many resources will be occupied.
*/
public class IgniteScheduler implements Scheduler {
/** Cpus. */
public static final String CPU = "cpus";
/** Mem. */
public static final String MEM = "mem";
/** Disk. */
public static final String DISK = "disk";
/** Default port range. */
public static final String DEFAULT_PORT = ":47500..47510";
/** Delimiter char. */
public static final String DELIM = ",";
/** Logger. */
private static final Logger log = Logger.getLogger(IgniteScheduler.class.getSimpleName());
/** ID generator. */
private AtomicInteger taskIdGenerator = new AtomicInteger();
/** Task on host. */
private Map<String, IgniteTask> tasks = new HashMap<>();
/** Cluster resources. */
private ClusterProperties clusterProps;
/** Resource provider. */
private ResourceProvider resourceProvider;
/**
* @param clusterProps Cluster limit.
* @param resourceProvider Resource provider.
*/
public IgniteScheduler(ClusterProperties clusterProps, ResourceProvider resourceProvider) {
this.clusterProps = clusterProps;
this.resourceProvider = resourceProvider;
}
/** {@inheritDoc} */
@Override public synchronized void resourceOffers(SchedulerDriver schedulerDriver, List<Protos.Offer> offers) {
log.log(Level.FINE, "Offers resources: {0}", offers.size());
for (Protos.Offer offer : offers) {
IgniteTask igniteTask = checkOffer(offer);
// Decline offer which doesn't match by mem or cpu.
if (igniteTask == null) {
schedulerDriver.declineOffer(offer.getId());
continue;
}
// Generate a unique task ID.
Protos.TaskID taskId = Protos.TaskID.newBuilder()
.setValue(Integer.toString(taskIdGenerator.incrementAndGet())).build();
log.log(Level.INFO, "Launching task: {0}", igniteTask);
// Create task to run.
Protos.TaskInfo task = createTask(offer, igniteTask, taskId);
try {
schedulerDriver.launchTasks(Collections.singletonList(offer.getId()),
Collections.singletonList(task),
Protos.Filters.newBuilder().setRefuseSeconds(1).build());
}
catch (RuntimeException e) {
log.log(Level.SEVERE, "Failed launch task. Task id: {0}. Task info: {1}",
new Object[]{taskId, task, e});
throw e;
}
tasks.put(taskId.getValue(), igniteTask);
}
}
/**
* Create Task.
*
* @param offer Offer.
* @param igniteTask Task description.
* @param taskId Task id.
* @return Task.
*/
private Protos.TaskInfo createTask(Protos.Offer offer, IgniteTask igniteTask, Protos.TaskID taskId) {
String cfgUrl = clusterProps.igniteConfigUrl() != null ?
clusterProps.igniteConfigUrl() : resourceProvider.igniteConfigUrl();
Protos.CommandInfo.Builder builder = Protos.CommandInfo.newBuilder()
.setEnvironment(Protos.Environment.newBuilder()
.addVariables(Protos.Environment.Variable.newBuilder()
.setName("IGNITE_TCP_DISCOVERY_ADDRESSES")
.setValue(getAddress(offer.getHostname())))
.addVariables(Protos.Environment.Variable.newBuilder()
.setName("JVM_OPTS")
.setValue(clusterProps.jmvOpts())))
.addUris(Protos.CommandInfo.URI.newBuilder()
.setValue(clusterProps.ignitePackageUrl() != null ?
clusterProps.ignitePackageUrl() : resourceProvider.igniteUrl())
.setExtract(true))
.addUris(Protos.CommandInfo.URI.newBuilder()
.setValue(cfgUrl));
// Collection user's libs.
Collection<String> usersLibs = new ArrayList<>();
if (clusterProps.usersLibsUrl() != null && !clusterProps.usersLibsUrl().isEmpty())
Collections.addAll(usersLibs, clusterProps.usersLibsUrl().split(DELIM));
if (resourceProvider.resourceUrl() != null && !resourceProvider.resourceUrl().isEmpty())
usersLibs.addAll(resourceProvider.resourceUrl());
for (String url : usersLibs)
builder.addUris(Protos.CommandInfo.URI.newBuilder().setValue(url));
String cfgName = resourceProvider.configName();
if (clusterProps.igniteConfigUrl() != null)
cfgName = fileName(clusterProps.igniteConfigUrl());
String licenceFile = null;
if (clusterProps.licenceUrl() != null)
licenceFile = fileName(clusterProps.licenceUrl());
builder.setValue(
(licenceFile != null ? "find . -maxdepth 1 -name \"" + licenceFile + "\" -exec cp {} ./*/ \\; && " : "")
+ "find . -maxdepth 1 -name \"*.jar\" -exec cp {} ./*/libs/ \\; && "
+ "./*/bin/ignite.sh "
+ cfgName
+ " -J-Xmx" + String.valueOf((int)igniteTask.mem() + "m")
+ " -J-Xms" + String.valueOf((int)igniteTask.mem()) + "m");
return Protos.TaskInfo.newBuilder()
.setName("Ignite node " + taskId.getValue())
.setTaskId(taskId)
.setSlaveId(offer.getSlaveId())
.setCommand(builder)
.addResources(Protos.Resource.newBuilder()
.setName(CPU)
.setType(Protos.Value.Type.SCALAR)
.setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.cpuCores())))
.addResources(Protos.Resource.newBuilder()
.setName(MEM)
.setType(Protos.Value.Type.SCALAR)
.setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.mem())))
.addResources(Protos.Resource.newBuilder()
.setName(DISK)
.setType(Protos.Value.Type.SCALAR)
.setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.disk())))
.build();
}
/**
* @param path Path.
* @return File name.
*/
private String fileName(String path) {
String[] split = path.split("/");
return split[split.length - 1];
}
/**
* @return Address running nodes.
*/
private String getAddress(String address) {
if (tasks.isEmpty()) {
if (address != null && !address.isEmpty())
return address + DEFAULT_PORT;
return "";
}
StringBuilder sb = new StringBuilder();
for (IgniteTask task : tasks.values())
sb.append(task.host()).append(DEFAULT_PORT).append(DELIM);
return sb.substring(0, sb.length() - 1);
}
/**
* Check slave resources and return resources infos.
*
* @param offer Offer request.
* @return Ignite task description.
*/
private IgniteTask checkOffer(Protos.Offer offer) {
// Check limit on running nodes.
if (clusterProps.instances() <= tasks.size())
return null;
double cpus = -1;
double mem = -1;
double disk = -1;
// Check host name
if (clusterProps.hostnameConstraint() != null
&& clusterProps.hostnameConstraint().matcher(offer.getHostname()).matches())
return null;
// Collect resource on slave.
for (Protos.Resource resource : offer.getResourcesList()) {
if (resource.getName().equals(CPU)) {
if (resource.getType().equals(Protos.Value.Type.SCALAR))
cpus = resource.getScalar().getValue();
else
log.log(Level.FINE, "Cpus resource was not a scalar: {0}" + resource.getType());
}
else if (resource.getName().equals(MEM)) {
if (resource.getType().equals(Protos.Value.Type.SCALAR))
mem = resource.getScalar().getValue();
else
log.log(Level.FINE, "Mem resource was not a scalar: {0}", resource.getType());
}
else if (resource.getName().equals(DISK))
if (resource.getType().equals(Protos.Value.Type.SCALAR))
disk = resource.getScalar().getValue();
else
log.log(Level.FINE, "Disk resource was not a scalar: {0}", resource.getType());
}
// Check that slave satisfies min requirements.
if (cpus < clusterProps.minCpuPerNode() || mem < clusterProps.minMemoryPerNode()) {
log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList());
return null;
}
double totalCpus = 0;
double totalMem = 0;
double totalDisk = 0;
// Collect occupied resources.
for (IgniteTask task : tasks.values()) {
totalCpus += task.cpuCores();
totalMem += task.mem();
totalDisk += task.disk();
}
cpus = Math.min(clusterProps.cpus() - totalCpus, Math.min(cpus, clusterProps.cpusPerNode()));
mem = Math.min(clusterProps.memory() - totalMem, Math.min(mem, clusterProps.memoryPerNode()));
disk = Math.min(clusterProps.disk() - totalDisk, Math.min(disk, clusterProps.diskPerNode()));
if ((clusterProps.cpusPerNode() != ClusterProperties.UNLIMITED && clusterProps.cpusPerNode() != cpus)
|| (clusterProps.memoryPerNode() != ClusterProperties.UNLIMITED && clusterProps.memoryPerNode() != mem)) {
log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList());
return null;
}
if (cpus > 0 && mem > 0)
return new IgniteTask(offer.getHostname(), cpus, mem, disk);
else {
log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList());
return null;
}
}
/** {@inheritDoc} */
@Override public synchronized void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {
final String taskId = taskStatus.getTaskId().getValue();
log.log(Level.INFO, "Received update event task: {0} is in state: {1}",
new Object[]{taskId, taskStatus.getState()});
if (taskStatus.getState().equals(Protos.TaskState.TASK_FAILED)
|| taskStatus.getState().equals(Protos.TaskState.TASK_ERROR)
|| taskStatus.getState().equals(Protos.TaskState.TASK_FINISHED)
|| taskStatus.getState().equals(Protos.TaskState.TASK_KILLED)
|| taskStatus.getState().equals(Protos.TaskState.TASK_LOST)) {
IgniteTask failedTask = tasks.remove(taskId);
if (failedTask != null) {
List<Protos.Request> requests = new ArrayList<>();
Protos.Request request = Protos.Request.newBuilder()
.addResources(Protos.Resource.newBuilder()
.setType(Protos.Value.Type.SCALAR)
.setName(MEM)
.setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.mem())))
.addResources(Protos.Resource.newBuilder()
.setType(Protos.Value.Type.SCALAR)
.setName(CPU)
.setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.cpuCores())))
.build();
requests.add(request);
schedulerDriver.requestResources(requests);
}
}
}
/**
* @param clusterProps Cluster properties.
*/
public void setClusterProps(ClusterProperties clusterProps) {
this.clusterProps = clusterProps;
}
/** {@inheritDoc} */
@Override public void registered(SchedulerDriver schedulerDriver, Protos.FrameworkID frameworkID,
Protos.MasterInfo masterInfo) {
log.log(Level.INFO, "Scheduler registered. Master: {0}:{1}, framework={2}", new Object[]{masterInfo.getIp(),
masterInfo.getPort(), frameworkID});
}
/** {@inheritDoc} */
@Override public void disconnected(SchedulerDriver schedulerDriver) {
log.info("Scheduler disconnected.");
}
/** {@inheritDoc} */
@Override public void error(SchedulerDriver schedulerDriver, String s) {
log.log(Level.SEVERE, "Failed. Error message: {0}", s);
}
/** {@inheritDoc} */
@Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID,
Protos.SlaveID slaveID, byte[] bytes) {
// No-op.
}
/** {@inheritDoc} */
@Override public void slaveLost(SchedulerDriver schedulerDriver, Protos.SlaveID slaveID) {
// No-op.
}
/** {@inheritDoc} */
@Override public void executorLost(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID,
Protos.SlaveID slaveID, int i) {
// No-op.
}
/** {@inheritDoc} */
@Override public void offerRescinded(SchedulerDriver schedulerDriver, Protos.OfferID offerID) {
// No-op.
}
/** {@inheritDoc} */
@Override public void reregistered(SchedulerDriver schedulerDriver, Protos.MasterInfo masterInfo) {
// No-op.
}
}