blob: 886f2d75743e7a75a89d582c9b49d6a955472f97 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.yarn;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Records;
import org.apache.ignite.yarn.utils.IgniteYarnUtils;
/**
* Application master request containers from Yarn and decides how many resources will be occupied.
*/
public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
/** */
public static final Logger log = Logger.getLogger(ApplicationMaster.class.getSimpleName());
/** Default port range. */
public static final String DEFAULT_PORT = ":47500..47510";
/** Delimiter char. */
public static final String DELIM = ",";
/** */
private long schedulerTimeout = TimeUnit.SECONDS.toMillis(1);
/** Yarn configuration. */
private final YarnConfiguration conf;
/** Cluster properties. */
private final ClusterProperties props;
/** Network manager. */
private NMClient nmClient;
/** Resource manager. */
private AMRMClientAsync<AMRMClient.ContainerRequest> rmClient;
/** Ignite path. */
private final Path ignitePath;
/** Config path. */
private Path cfgPath;
/** Hadoop file system. */
private FileSystem fs;
/** Buffered tokens to be injected into newly allocated containers. */
private ByteBuffer allTokens;
/** Running containers. */
private final Map<ContainerId, IgniteContainer> containers = new ConcurrentHashMap<>();
/**
* @param ignitePath Hdfs path to ignite.
* @param props Cluster properties.
*/
public ApplicationMaster(String ignitePath, ClusterProperties props) throws Exception {
this.conf = new YarnConfiguration();
this.props = props;
this.ignitePath = new Path(ignitePath);
}
/** {@inheritDoc} */
@Override public synchronized void onContainersAllocated(List<Container> conts) {
for (Container c : conts) {
if (checkContainer(c)) {
log.log(Level.INFO, "Container {0} allocated", c.getId());
try {
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
if (UserGroupInformation.isSecurityEnabled())
// Set the tokens to the newly allocated container:
ctx.setTokens(allTokens.duplicate());
Map<String, String> env = new HashMap<>(ctx.getEnvironment());
Map<String, String> systemEnv = System.getenv();
for (String key : systemEnv.keySet()) {
if (key.matches("^IGNITE_[_0-9A-Z]+$"))
env.put(key, systemEnv.get(key));
}
env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(c.getNodeId().getHost()));
if (props.jvmOpts() != null && !props.jvmOpts().isEmpty())
env.put("JVM_OPTS", props.jvmOpts());
ctx.setEnvironment(env);
Map<String, LocalResource> resources = new HashMap<>();
resources.put("ignite", IgniteYarnUtils.setupFile(ignitePath, fs, LocalResourceType.ARCHIVE));
resources.put("ignite-config.xml", IgniteYarnUtils.setupFile(cfgPath, fs, LocalResourceType.FILE));
if (props.licencePath() != null)
resources.put("gridgain-license.xml",
IgniteYarnUtils.setupFile(new Path(props.licencePath()), fs, LocalResourceType.FILE));
if (props.userLibs() != null)
resources.put("libs", IgniteYarnUtils.setupFile(new Path(props.userLibs()), fs,
LocalResourceType.FILE));
ctx.setLocalResources(resources);
ctx.setCommands(
Collections.singletonList(
(props.licencePath() != null ? "cp gridgain-license.xml ./ignite/*/ || true && " : "")
+ "cp -r ./libs/* ./ignite/*/libs/ || true && "
+ "./ignite/*/bin/ignite.sh "
+ "./ignite-config.xml"
+ " -J-Xmx" + ((int)props.memoryPerNode()) + "m"
+ " -J-Xms" + ((int)props.memoryPerNode()) + "m"
+ IgniteYarnUtils.YARN_LOG_OUT
));
log.log(Level.INFO, "Launching container: {0}.", c.getId());
nmClient.startContainer(c, ctx);
containers.put(c.getId(),
new IgniteContainer(
c.getId(),
c.getNodeId(),
c.getResource().getVirtualCores(),
c.getResource().getMemory()));
}
catch (Exception ex) {
log.log(Level.WARNING, "Error launching container " + c.getId(), ex);
}
}
else {
log.log(Level.WARNING, "Container {0} check failed. Releasing...", c.getId());
rmClient.releaseAssignedContainer(c.getId());
}
}
}
/**
* @param cont Container.
* @return {@code True} if container satisfies requirements.
*/
private boolean checkContainer(Container cont) {
// Check limit on running nodes.
if (props.instances() <= containers.size()) {
log.log(Level.WARNING, "Limit on running nodes exceeded. ({0} of {1} max)",
new Object[] {containers.size(), props.instances()});
return false;
}
// Check host name
if (props.hostnameConstraint() != null
&& props.hostnameConstraint().matcher(cont.getNodeId().getHost()).matches()) {
log.log(Level.WARNING, "Wrong host name '{0}'. It didn't match to '{1}' pattern.",
new Object[] {cont.getNodeId().getHost(), props.hostnameConstraint().toString()});
return false;
}
// Check that slave satisfies min requirements.
if (cont.getResource().getVirtualCores() < props.cpusPerNode()
|| cont.getResource().getMemory() < props.totalMemoryPerNode()) {
log.log(Level.WARNING, "Container resources not sufficient requirements. Host: {0}, cpu: {1}, mem: {2}",
new Object[] {cont.getNodeId().getHost(), cont.getResource().getVirtualCores(),
cont.getResource().getMemory()});
return false;
}
return true;
}
/**
* @return Address running nodes.
*/
private String getAddress(String addr) {
if (containers.isEmpty()) {
if (addr != null && !addr.isEmpty())
return addr + DEFAULT_PORT;
return "";
}
StringBuilder sb = new StringBuilder();
for (IgniteContainer cont : containers.values())
sb.append(cont.nodeId.getHost()).append(DEFAULT_PORT).append(DELIM);
return sb.substring(0, sb.length() - 1);
}
/** {@inheritDoc} */
@Override public synchronized void onContainersCompleted(List<ContainerStatus> statuses) {
for (ContainerStatus status : statuses) {
containers.remove(status.getContainerId());
log.log(Level.INFO, "Container completed. Container id: {0}. State: {1}.",
new Object[]{status.getContainerId(), status.getState()});
}
}
/** {@inheritDoc} */
@Override public synchronized void onNodesUpdated(List<NodeReport> updated) {
for (NodeReport node : updated) {
// If node unusable.
if (node.getNodeState().isUnusable()) {
for (IgniteContainer cont : containers.values()) {
if (cont.nodeId().equals(node.getNodeId())) {
containers.remove(cont.id());
log.log(Level.WARNING, "Node is unusable. Node: {0}, state: {1}.",
new Object[]{node.getNodeId().getHost(), node.getNodeState()});
}
}
log.log(Level.WARNING, "Node is unusable. Node: {0}, state: {1}.",
new Object[]{node.getNodeId().getHost(), node.getNodeState()});
}
}
}
/** {@inheritDoc} */
@Override public void onShutdownRequest() {
// No-op.
}
/** {@inheritDoc} */
@Override public void onError(Throwable t) {
nmClient.stop();
}
/** {@inheritDoc} */
@Override public float getProgress() {
return 50;
}
/**
* @param args Args.
* @throws Exception If failed.
*/
public static void main(String[] args) throws Exception {
ClusterProperties props = ClusterProperties.from();
ApplicationMaster master = new ApplicationMaster(args[0], props);
master.init();
master.run();
}
/**
* Runs application master.
*
* @throws Exception If failed.
*/
public void run() throws Exception {
// Register with ResourceManager
rmClient.registerApplicationMaster("", 0, "");
log.log(Level.INFO, "Application master registered.");
// Priority for worker containers - priorities are intra-application
Priority priority = Records.newRecord(Priority.class);
priority.setPriority(0);
try {
// Check ignite cluster.
while (!nmClient.isInState(Service.STATE.STOPPED)) {
int runningCnt = containers.size();
if (runningCnt < props.instances() && checkAvailableResource()) {
// Resource requirements for worker containers.
Resource capability = Records.newRecord(Resource.class);
capability.setMemory((int)props.totalMemoryPerNode());
capability.setVirtualCores((int)props.cpusPerNode());
for (int i = 0; i < props.instances() - runningCnt; ++i) {
// Make container requests to ResourceManager
AMRMClient.ContainerRequest containerAsk =
new AMRMClient.ContainerRequest(capability, null, null, priority);
rmClient.addContainerRequest(containerAsk);
log.log(Level.INFO, "Making request. Memory: {0}, cpu {1}.",
new Object[]{props.totalMemoryPerNode(), props.cpusPerNode()});
}
}
TimeUnit.MILLISECONDS.sleep(schedulerTimeout);
}
}
catch (InterruptedException ignored) {
// Un-register with ResourceManager
rmClient.unregisterApplicationMaster(FinalApplicationStatus.KILLED, "", "");
log.log(Level.WARNING, "Application master killed.");
}
catch (Exception e) {
// Un-register with ResourceManager
rmClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, "", "");
log.log(Level.SEVERE, "Application master failed.", e);
}
}
/**
* @return {@code True} if cluster contains available resources.
*/
private boolean checkAvailableResource() {
Resource availableRes = rmClient.getAvailableResources();
return availableRes == null || availableRes.getMemory() >= props.totalMemoryPerNode()
&& availableRes.getVirtualCores() >= props.cpusPerNode();
}
/**
* @throws IOException
*/
public void init() throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
Credentials cred = UserGroupInformation.getCurrentUser().getCredentials();
allTokens = IgniteYarnUtils.createTokenBuffer(cred);
}
fs = FileSystem.get(conf);
nmClient = NMClient.createNMClient();
nmClient.init(conf);
nmClient.start();
// Create async application master.
rmClient = AMRMClientAsync.createAMRMClientAsync(300, this);
rmClient.init(conf);
rmClient.start();
if (props.igniteCfg() == null || props.igniteCfg().isEmpty()) {
InputStream input = Thread.currentThread().getContextClassLoader()
.getResourceAsStream(IgniteYarnUtils.DEFAULT_IGNITE_CONFIG);
cfgPath = new Path(props.igniteWorkDir() + File.separator + IgniteYarnUtils.DEFAULT_IGNITE_CONFIG);
// Create file. Override by default.
FSDataOutputStream outputStream = fs.create(cfgPath, true);
IOUtils.copy(input, outputStream);
IOUtils.closeQuietly(input);
IOUtils.closeQuietly(outputStream);
}
else
cfgPath = new Path(props.igniteCfg());
}
/**
* Sets NMClient.
*
* @param nmClient NMClient.
*/
public void setNmClient(NMClient nmClient) {
this.nmClient = nmClient;
}
/**
* Sets RMClient
*
* @param rmClient AMRMClientAsync.
*/
public void setRmClient(AMRMClientAsync<AMRMClient.ContainerRequest> rmClient) {
this.rmClient = rmClient;
}
/**
* Sets scheduler timeout.
*
* @param schedulerTimeout Scheduler timeout.
*/
public void setSchedulerTimeout(long schedulerTimeout) {
this.schedulerTimeout = schedulerTimeout;
}
/**
* Sets file system.
* @param fs File system.
*/
public void setFs(FileSystem fs) {
this.fs = fs;
}
/**
* JUST FOR TESTING!!!
*
* @return Running containers.
*/
@Deprecated
public Map<ContainerId, IgniteContainer> getContainers() {
return containers;
}
}