blob: 41ee7cc396f37fcf7e805e45df6f2dc78fffa348 [file] [log] [blame]
// Copyright 2017 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.apiserver.resources;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.FormParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.twitter.heron.apiserver.Constants;
import com.twitter.heron.apiserver.actions.ActionFactory;
import com.twitter.heron.apiserver.actions.ActionFactoryImpl;
import com.twitter.heron.apiserver.actions.ActionType;
import com.twitter.heron.apiserver.actions.Keys;
import com.twitter.heron.apiserver.utils.ConfigUtils;
import com.twitter.heron.apiserver.utils.FileHelper;
import com.twitter.heron.apiserver.utils.Logging;
import com.twitter.heron.common.basics.DryRunFormatType;
import com.twitter.heron.common.basics.FileUtils;
import com.twitter.heron.common.basics.Pair;
import com.twitter.heron.scheduler.dryrun.DryRunResponse;
import com.twitter.heron.scheduler.dryrun.SubmitDryRunResponse;
import com.twitter.heron.scheduler.dryrun.UpdateDryRunResponse;
import com.twitter.heron.scheduler.utils.DryRunRenders;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.common.Key;
@Path("/topologies")
public class TopologyResource extends HeronResource {
private static final Logger LOG = LoggerFactory.getLogger(TopologyResource.class);
private static final String TOPOLOGY_TAR_GZ_FILENAME = "topology.tar.gz";
private static final int HTTP_UNPROCESSABLE_ENTITY_CODE = 422;
private static final String FORM_KEY_NAME = "name";
private static final String FORM_KEY_CLUSTER = "cluster";
private static final String FORM_KEY_ROLE = "role";
private static final String FORM_KEY_ENVIRONMENT = "environment";
private static final String FORM_KEY_DEFINITION = "definition";
private static final String FORM_KEY_TOPOLOGY = "topology";
private static final String FORM_KEY_USER = "user";
private static final Set<String> SUBMIT_TOPOLOGY_PARAMS = Collections.unmodifiableSet(
new HashSet<>(
Arrays.asList(
FORM_KEY_NAME,
FORM_KEY_CLUSTER,
FORM_KEY_ROLE,
FORM_KEY_ENVIRONMENT,
FORM_KEY_DEFINITION,
FORM_KEY_TOPOLOGY,
FORM_KEY_USER
)
)
);
private static final String[] REQUIRED_SUBMIT_TOPOLOGY_PARAMS = {
FORM_KEY_NAME,
FORM_KEY_CLUSTER,
FORM_KEY_ROLE,
FORM_KEY_DEFINITION,
FORM_KEY_TOPOLOGY
};
private static final String PARAM_COMPONENT_PARALLELISM = "component_parallelism";
private static final String PARAM_DRY_RUN = "dry_run";
private static final String PARAM_DRY_RUN_FORMAT = "dry_run_format";
private static final String DEFAULT_DRY_RUN_FORMAT = DryRunFormatType.TABLE.toString();
// path format /topologies/{cluster}/{role}/{environment}/{name}
private static final String TOPOLOGY_PATH_FORMAT = "/topologies/%s/%s/%s/%s";
private final ActionFactory actionFactory = new ActionFactoryImpl();
@POST
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Produces(MediaType.APPLICATION_JSON)
@SuppressWarnings({"IllegalCatch", "JavadocMethod"})
public Response submit(FormDataMultiPart form) throws IOException {
// verify that all we have all the required params
final List<String> missingDataKeys =
verifyKeys(form.getFields().keySet(), REQUIRED_SUBMIT_TOPOLOGY_PARAMS);
if (!missingDataKeys.isEmpty()) {
// return error since we are missing required parameters
final String message = String.format("Validation failed missing required params: %s",
missingDataKeys.toString());
return Response.status(HTTP_UNPROCESSABLE_ENTITY_CODE)
.type(MediaType.APPLICATION_JSON)
.entity(createValidationError(message, missingDataKeys))
.build();
}
final String topologyName = Forms.getString(form, FORM_KEY_NAME);
final String cluster = Forms.getString(form, FORM_KEY_CLUSTER);
final String role = Forms.getString(form, FORM_KEY_ROLE);
final String environment =
Forms.getString(form, FORM_KEY_ENVIRONMENT, Constants.DEFAULT_HERON_ENVIRONMENT);
final String user = Forms.getString(form, FORM_KEY_USER, role);
// submit overrides are passed key=value
final Map<String, String> submitOverrides = getSubmitOverrides(form);
final String topologyDirectory =
Files.createTempDirectory(topologyName).toFile().getAbsolutePath();
try {
// upload the topology definition file to the topology directory
final FormDataBodyPart definitionFilePart = form.getField(FORM_KEY_DEFINITION);
final File topologyDefinitionFile = Forms.uploadFile(definitionFilePart, topologyDirectory);
// upload the topology binary file to the topology directory
final FormDataBodyPart topologyFilePart = form.getField(FORM_KEY_TOPOLOGY);
final File topologyBinaryFile = Forms.uploadFile(topologyFilePart, topologyDirectory);
final boolean isDryRun = form.getFields().containsKey(PARAM_DRY_RUN);
final Config config = configWithKeyValues(
Arrays.asList(
Pair.create(Key.CLUSTER.value(), cluster),
Pair.create(Key.TOPOLOGY_NAME.value(), topologyName),
Pair.create(Key.ROLE.value(), role),
Pair.create(Key.ENVIRON.value(), environment),
Pair.create(Key.SUBMIT_USER.value(), user),
Pair.create(Key.DRY_RUN.value(), isDryRun)
)
);
// copy configuration files to the sandbox config location
// topology-dir/<default-heron-sandbox-config>
FileHelper.copyDirectory(
Paths.get(getConfigurationDirectory()),
Paths.get(topologyDirectory, Constants.DEFAULT_HERON_SANDBOX_CONFIG));
final java.nio.file.Path overridesPath =
Paths.get(topologyDirectory, Constants.DEFAULT_HERON_SANDBOX_CONFIG,
Constants.OVERRIDE_FILE);
// copy override file into topology configuration directory
FileHelper.copy(Paths.get(getConfigurationOverridePath()), overridesPath);
// apply submit overrides
ConfigUtils.applyOverrides(overridesPath, submitOverrides);
// apply overrides to state manager config
ConfigUtils.applyOverridesToStateManagerConfig(overridesPath,
Paths.get(topologyDirectory, Constants.DEFAULT_HERON_SANDBOX_CONFIG,
Constants.STATE_MANAGER_FILE)
);
// create tar file from the contents of the topology directory
final File topologyPackageFile =
Paths.get(topologyDirectory, TOPOLOGY_TAR_GZ_FILENAME).toFile();
FileHelper.createTarGz(topologyPackageFile, FileHelper.getChildren(topologyDirectory));
// submit the topology
getActionFactory()
.createSubmitAction(config,
topologyPackageFile.getAbsolutePath(),
topologyBinaryFile.getName(),
topologyDefinitionFile.getAbsolutePath())
.execute();
return Response.created(
URI.create(String.format(TOPOLOGY_PATH_FORMAT,
cluster, role, environment, topologyName)))
.type(MediaType.APPLICATION_JSON)
.entity(createdResponse(cluster, role, environment, topologyName)).build();
} catch (SubmitDryRunResponse response) {
return createDryRunResponse(response,
Forms.getString(form, PARAM_DRY_RUN_FORMAT, DEFAULT_DRY_RUN_FORMAT));
} catch (Exception ex) {
return Response.serverError()
.type(MediaType.APPLICATION_JSON)
.entity(createMessage(ex.getMessage()))
.build();
} finally {
FileUtils.deleteDir(topologyDirectory);
}
}
@POST
@Path("/{cluster}/{role}/{environment}/{name}/activate")
@Produces(MediaType.APPLICATION_JSON)
@SuppressWarnings("IllegalCatch")
public Response activate(
final @PathParam("cluster") String cluster,
final @PathParam("role") String role,
final @PathParam("environment") String environment,
final @PathParam("name") String name) {
try {
final Config config = getConfig(cluster, role, environment, name);
getActionFactory().createRuntimeAction(config, ActionType.ACTIVATE).execute();
return Response.ok()
.type(MediaType.APPLICATION_JSON)
.entity(createMessage(String.format("%s activated", name)))
.build();
} catch (Exception ex) {
return Response.serverError()
.type(MediaType.APPLICATION_JSON)
.entity(createMessage(ex.getMessage()))
.build();
}
}
@POST
@Path("/{cluster}/{role}/{environment}/{name}/deactivate")
@Produces(MediaType.APPLICATION_JSON)
@SuppressWarnings("IllegalCatch")
public Response deactivate(
final @PathParam("cluster") String cluster,
final @PathParam("role") String role,
final @PathParam("environment") String environment,
final @PathParam("name") String name) {
try {
final Config config = getConfig(cluster, role, environment, name);
getActionFactory().createRuntimeAction(config, ActionType.DEACTIVATE).execute();
return Response.ok()
.type(MediaType.APPLICATION_JSON)
.entity(createMessage(String.format("%s deactivated", name)))
.build();
} catch (Exception ex) {
return Response.serverError()
.type(MediaType.APPLICATION_JSON)
.entity(createMessage(ex.getMessage()))
.build();
}
}
@POST
@Path("/{cluster}/{role}/{environment}/{name}/restart")
@Produces(MediaType.APPLICATION_JSON)
@SuppressWarnings("IllegalCatch")
public Response restart(
final @PathParam("cluster") String cluster,
final @PathParam("role") String role,
final @PathParam("environment") String environment,
final @PathParam("name") String name,
final @DefaultValue("-1") @FormParam("container_id") int containerId) {
try {
final List<Pair<String, Object>> keyValues = new ArrayList<>(
Arrays.asList(
Pair.create(Key.CLUSTER.value(), cluster),
Pair.create(Key.ROLE.value(), role),
Pair.create(Key.ENVIRON.value(), environment),
Pair.create(Key.TOPOLOGY_NAME.value(), name),
Pair.create(Key.TOPOLOGY_CONTAINER_ID.value(), containerId)
)
);
final Config config = configWithKeyValues(keyValues);
getActionFactory().createRuntimeAction(config, ActionType.RESTART).execute();
return Response.ok()
.type(MediaType.APPLICATION_JSON)
.entity(createMessage(String.format("%s restarted", name)))
.build();
} catch (Exception ex) {
return Response.serverError()
.type(MediaType.APPLICATION_JSON)
.entity(createMessage(ex.getMessage()))
.build();
}
}
@POST
@Path("/{cluster}/{role}/{environment}/{name}/update")
@Produces(MediaType.APPLICATION_JSON)
@SuppressWarnings({"IllegalCatch", "JavadocMethod"})
public Response update(
final @PathParam("cluster") String cluster,
final @PathParam("role") String role,
final @PathParam("environment") String environment,
final @PathParam("name") String name,
MultivaluedMap<String, String> params) {
try {
if (params == null || !params.containsKey(PARAM_COMPONENT_PARALLELISM)) {
return Response.status(HTTP_UNPROCESSABLE_ENTITY_CODE)
.type(MediaType.APPLICATION_JSON)
.entity(createMessage("missing component_parallelism param"))
.build();
}
List<String> components = params.get(PARAM_COMPONENT_PARALLELISM);
final List<Pair<String, Object>> keyValues = new ArrayList<>(
Arrays.asList(
Pair.create(Key.CLUSTER.value(), cluster),
Pair.create(Key.ROLE.value(), role),
Pair.create(Key.ENVIRON.value(), environment),
Pair.create(Key.TOPOLOGY_NAME.value(), name),
Pair.create(Keys.NEW_COMPONENT_PARALLELISM_KEY,
String.join(",", components))
)
);
// has a dry run been requested?
if (params.containsKey(PARAM_DRY_RUN)) {
keyValues.add(Pair.create(Key.DRY_RUN.value(), Boolean.TRUE));
}
final Set<Pair<String, Object>> overrides = getUpdateOverrides(params);
// apply overrides if they exists
if (!overrides.isEmpty()) {
keyValues.addAll(overrides);
}
final Config config = configWithKeyValues(keyValues);
getActionFactory().createRuntimeAction(config, ActionType.UPDATE).execute();
return Response.ok()
.type(MediaType.APPLICATION_JSON)
.entity(createMessage(String.format("%s updated", name)))
.build();
} catch (UpdateDryRunResponse response) {
return createDryRunResponse(response,
Forms.getFirstOrDefault(params, PARAM_DRY_RUN_FORMAT, DEFAULT_DRY_RUN_FORMAT));
} catch (Exception ex) {
LOG.error("error updating topology {}", name, ex);
return Response.serverError()
.type(MediaType.APPLICATION_JSON)
.entity(createMessage(ex.getMessage()))
.build();
}
}
@DELETE
@Path("/{cluster}/{role}/{environment}/{name}")
@Produces(MediaType.APPLICATION_JSON)
@SuppressWarnings("IllegalCatch")
public Response kill(
final @PathParam("cluster") String cluster,
final @PathParam("role") String role,
final @PathParam("environment") String environment,
final @PathParam("name") String name) {
try {
final Config config = getConfig(cluster, role, environment, name);
getActionFactory().createRuntimeAction(config, ActionType.KILL).execute();
return Response.ok()
.type(MediaType.APPLICATION_JSON)
.entity(createMessage(String.format("%s killed", name)))
.build();
} catch (Exception ex) {
final String message = ex.getMessage();
final Response.Status status = message.contains("does not exist")
? Response.Status.NOT_FOUND : Response.Status.INTERNAL_SERVER_ERROR;
return Response.status(status)
.type(MediaType.APPLICATION_JSON)
.entity(createMessage(ex.getMessage()))
.build();
}
}
ActionFactory getActionFactory() {
return actionFactory;
}
private Config getConfig(String cluster, String role, String environment, String topologyName) {
return configWithKeyValues(
Arrays.asList(
Pair.create(Key.CLUSTER.value(), cluster),
Pair.create(Key.ROLE.value(), role),
Pair.create(Key.ENVIRON.value(), environment),
Pair.create(Key.TOPOLOGY_NAME.value(), topologyName)
));
}
private Config configWithKeyValues(Collection<Pair<String, Object>> keyValues) {
final Config.Builder builder = Config.newBuilder().putAll(getBaseConfiguration());
for (Pair<String, Object> keyValue : keyValues) {
builder.put(keyValue.first, keyValue.second);
}
builder.put(Key.VERBOSE, Logging.isVerbose());
return Config.toLocalMode(builder.build());
}
private static List<String> verifyKeys(Set<String> keys, String... requiredKeys) {
final List<String> missingKeys = new ArrayList<>();
if (requiredKeys != null) {
for (String key : requiredKeys) {
if (!keys.contains(key)) {
missingKeys.add(key);
}
}
}
return missingKeys;
}
private static Map<String, String> getSubmitOverrides(FormDataMultiPart form) {
final Map<String, String> overrides = new HashMap<>();
for (String key : form.getFields().keySet()) {
if (!SUBMIT_TOPOLOGY_PARAMS.contains(key)) {
overrides.put(key, Forms.getString(form, key));
}
}
return overrides;
}
private static Set<Pair<String, Object>> getUpdateOverrides(
MultivaluedMap<String, String> params) {
final Set<Pair<String, Object>> overrides = new HashSet<>();
for (String key : params.keySet()) {
if (!PARAM_COMPONENT_PARALLELISM.equalsIgnoreCase(key)) {
overrides.add(Pair.create(key, params.getFirst(key)));
}
}
return overrides;
}
@SuppressWarnings("IllegalCatch")
private static DryRunFormatType getDryRunFormatType(String type) {
try {
if (type != null) {
return DryRunFormatType.valueOf(type);
}
} catch (Exception ex) {
LOG.warn("unknown dry format render type {} defaulting to table", type);
}
return DryRunFormatType.TABLE;
}
private static String getDryRunResponse(DryRunResponse response, String type) {
if (response instanceof SubmitDryRunResponse) {
return DryRunRenders.render((SubmitDryRunResponse) response,
getDryRunFormatType(type));
} else if (response instanceof UpdateDryRunResponse) {
return DryRunRenders.render((UpdateDryRunResponse) response,
getDryRunFormatType(type));
}
return "Unknown dry run response type " + response.getClass().getName();
}
private static Response createDryRunResponse(DryRunResponse response, String type) {
final String body = new ObjectMapper().createObjectNode()
.put("response", getDryRunResponse(response, type))
.toString();
return Response.ok()
.type(MediaType.APPLICATION_JSON)
.entity(body)
.build();
}
private static String createdResponse(String cluster, String role, String environment,
String topologyName) {
return new ObjectMapper().createObjectNode()
.put("name", topologyName)
.put("cluster", cluster)
.put("role", role)
.put("environment", environment)
.toString();
}
private static ObjectNode createBaseError(String message) {
final ObjectMapper mapper = new ObjectMapper();
return mapper.createObjectNode().put("message", message);
}
private static String createMessage(String message) {
return createBaseError(message).toString();
}
private static String createValidationError(String message, List<String> missing) {
ObjectNode node = createBaseError(message);
ObjectNode errors = node.putObject("errors");
ArrayNode missingParameters = errors.putArray("missing_parameters");
for (String param : missing) {
missingParameters.add(param);
}
return node.toString();
}
}