blob: 5cd710a7bc0627a9b0b953af044a3c2ac637d35a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.scheduler.aurora;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.utils.ShellUtils;
/**
* Implementation of AuroraController that shells out to the Aurora CLI to control the Aurora
* scheduler workflow of a topology.
*/
class AuroraCLIController implements AuroraController {
private static final Logger LOG = Logger.getLogger(AuroraCLIController.class.getName());
private final String jobSpec;
private final boolean isVerbose;
private String auroraFilename;
AuroraCLIController(
String jobName,
String cluster,
String role,
String env,
String auroraFilename,
boolean isVerbose) {
this.auroraFilename = auroraFilename;
this.isVerbose = isVerbose;
this.jobSpec = String.format("%s/%s/%s/%s", cluster, role, env, jobName);
}
@Override
public boolean createJob(Map<AuroraField, String> bindings, Map<String, String> extra) {
List<String> auroraCmd =
new ArrayList<>(Arrays.asList("aurora", "job", "create", "--wait-until", "RUNNING"));
for (AuroraField field : bindings.keySet()) {
auroraCmd.add("--bind");
auroraCmd.add(String.format("%s=%s", field, bindings.get(field)));
}
if (!extra.isEmpty()) {
for (String field : extra.keySet()) {
if (field.equals(AuroraContext.JOB_TEMPLATE)) {
auroraFilename = auroraFilename.replace("heron.aurora", extra.get(field));
} else {
auroraCmd.add("--bind");
auroraCmd.add(String.format("%s=%s", field, extra.get(field)));
}
}
}
auroraCmd.add(jobSpec);
auroraCmd.add(auroraFilename);
if (isVerbose) {
auroraCmd.add("--verbose");
}
return runProcess(auroraCmd);
}
// Kill an aurora job
@Override
public boolean killJob() {
List<String> auroraCmd = new ArrayList<>(Arrays.asList("aurora", "job", "killall"));
auroraCmd.add(jobSpec);
appendAuroraCommandOptions(auroraCmd, isVerbose);
return runProcess(auroraCmd);
}
// Restart an aurora job
@Override
public boolean restart(Integer containerId) {
List<String> auroraCmd = new ArrayList<>(Arrays.asList("aurora", "job", "restart"));
if (containerId != null) {
auroraCmd.add(String.format("%s/%d", jobSpec, containerId));
} else {
auroraCmd.add(jobSpec);
}
appendAuroraCommandOptions(auroraCmd, isVerbose);
return runProcess(auroraCmd);
}
@Override
public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) {
String instancesToKill = getInstancesIdsToKill(containersToRemove);
//aurora job kill <cluster>/<role>/<env>/<name>/<instance_ids>
List<String> auroraCmd = new ArrayList<>(Arrays.asList(
"aurora", "job", "kill", jobSpec + "/" + instancesToKill));
appendAuroraCommandOptions(auroraCmd, isVerbose);
LOG.info(String.format(
"Killing %s aurora containers: %s", containersToRemove.size(), auroraCmd));
if (!runProcess(auroraCmd)) {
throw new RuntimeException("Failed to kill freed aurora instances: " + instancesToKill);
}
}
private static final String ERR_PROMPT =
"The topology can be in a strange stage. Please check carefully or redeploy the topology !!";
@Override
public Set<Integer> addContainers(Integer count) {
//aurora job add <cluster>/<role>/<env>/<name>/<instance_id> <count>
//clone instance 0
List<String> auroraCmd = new ArrayList<>(Arrays.asList(
"aurora", "job", "add", "--wait-until", "RUNNING",
jobSpec + "/0", count.toString(), "--verbose"));
LOG.info(String.format("Requesting %s new aurora containers %s", count, auroraCmd));
StringBuilder stderr = new StringBuilder();
if (!runProcess(auroraCmd, null, stderr)) {
throw new RuntimeException(
"Failed to create " + count + " new aurora instances. " + ERR_PROMPT);
}
if (stderr.length() <= 0) { // no container was added
throw new RuntimeException("Empty output by Aurora. " + ERR_PROMPT);
}
return extractContainerIds(stderr.toString());
}
private Set<Integer> extractContainerIds(String auroraOutputStr) {
String pattern = "Querying instance statuses: [";
int idx1 = auroraOutputStr.indexOf(pattern);
if (idx1 < 0) { // no container was added
LOG.info("stdout & stderr by Aurora " + auroraOutputStr);
return new HashSet<Integer>();
}
idx1 += pattern.length();
int idx2 = auroraOutputStr.indexOf("]", idx1);
String containerIdStr = auroraOutputStr.substring(idx1, idx2);
LOG.info("container IDs returned by Aurora " + containerIdStr);
return Arrays.asList(containerIdStr.split(", "))
.stream().map(x->Integer.valueOf(x)).collect(Collectors.toSet());
}
// Utils method for unit tests
@VisibleForTesting
boolean runProcess(List<String> auroraCmd, StringBuilder stdout, StringBuilder stderr) {
int status =
ShellUtils.runProcess(auroraCmd.toArray(new String[auroraCmd.size()]),
stderr != null ? stderr : new StringBuilder());
if (status != 0) {
LOG.severe(String.format(
"Failed to run process. Command=%s, STDOUT=%s, STDERR=%s", auroraCmd, stdout, stderr));
}
return status == 0;
}
// Utils method for unit tests
@VisibleForTesting
boolean runProcess(List<String> auroraCmd) {
return runProcess(auroraCmd, null, null);
}
private static String getInstancesIdsToKill(Set<PackingPlan.ContainerPlan> containersToRemove) {
StringBuilder ids = new StringBuilder();
for (PackingPlan.ContainerPlan containerPlan : containersToRemove) {
if (ids.length() > 0) {
ids.append(",");
}
ids.append(containerPlan.getId());
}
return ids.toString();
}
// Static method to append verbose and batching options if needed
private static void appendAuroraCommandOptions(List<String> auroraCmd, boolean isVerbose) {
// Append verbose if needed
if (isVerbose) {
auroraCmd.add("--verbose");
}
// Append batch size.
// Note that we can not use "--no-batching" since "restart" command does not accept it.
// So we play a small trick here by setting batch size Integer.MAX_VALUE.
auroraCmd.add("--batch-size");
auroraCmd.add(Integer.toString(Integer.MAX_VALUE));
}
}