blob: 52be922874085a2a9abb8da152c5cdfa6e621558 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.webapp;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.Version;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializerProvider;
import org.codehaus.jackson.map.module.SimpleModule;
import org.codehaus.jackson.map.ser.std.SerializerBase;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.beanutils.BeanMap;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.log4j.DTLoggerFactory;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StringCodec;
import com.datatorrent.stram.StramAppContext;
import com.datatorrent.stram.StramUtils;
import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.StringCodecs;
import com.datatorrent.stram.codec.LogicalPlanSerializer;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlan.ModuleMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
import com.datatorrent.stram.util.ConfigValidator;
import com.datatorrent.stram.util.JSONSerializationProvider;
/**
*
* The web services implementation in the stram<p>
* <br>
* This class would ensure the the caller is authorized and then provide access to all the dag data stored
* in the stram<br>
* <br>
*
* @since 0.3.2
*/
@Singleton
@Path(StramWebServices.PATH)
public class StramWebServices
{
private static final Logger LOG = LoggerFactory.getLogger(StramWebServices.class);
public static final String PATH = WebServices.PATH + "/" + WebServices.VERSION + "/stram";
public static final String PATH_INFO = "info";
public static final String PATH_PHYSICAL_PLAN = "physicalPlan";
public static final String PATH_PHYSICAL_PLAN_OPERATORS = PATH_PHYSICAL_PLAN + "/operators";
public static final String PATH_PHYSICAL_PLAN_STREAMS = PATH_PHYSICAL_PLAN + "/streams";
public static final String PATH_PHYSICAL_PLAN_CONTAINERS = PATH_PHYSICAL_PLAN + "/containers";
public static final String PATH_SHUTDOWN = "shutdown";
public static final String PATH_RECORDINGS = "recordings";
public static final String PATH_RECORDINGS_START = PATH_RECORDINGS + "/start";
public static final String PATH_RECORDINGS_STOP = PATH_RECORDINGS + "/stop";
public static final String PATH_LOGICAL_PLAN = "logicalPlan";
public static final String PATH_LOGICAL_PLAN_OPERATORS = PATH_LOGICAL_PLAN + "/operators";
public static final String PATH_OPERATOR_CLASSES = "operatorClasses";
public static final String PATH_ALERTS = "alerts";
public static final String PATH_LOGGERS = "loggers";
public static final String PATH_STACKTRACE = "stackTrace";
public static final long WAIT_TIME = 5000;
public static final long STACK_TRACE_WAIT_TIME = 1000;
public static final long STACK_TRACE_ATTEMPTS = 10;
//public static final String PATH_ACTION_OPERATOR_CLASSES = "actionOperatorClasses";
private StramAppContext appCtx;
@Context
private HttpServletResponse httpResponse;
@Inject
@Nullable
private StreamingContainerManager dagManager;
private ObjectMapper objectMapper = new JSONSerializationProvider().getContext(null);
private boolean initialized = false;
private OperatorDiscoverer operatorDiscoverer = new OperatorDiscoverer();
@Inject
public StramWebServices(StramAppContext context)
{
this.appCtx = context;
}
Boolean hasAccess(HttpServletRequest request)
{
String remoteUser = request.getRemoteUser();
if (remoteUser != null) {
UserGroupInformation callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
if (callerUGI != null) {
return false;
}
}
return true;
}
@SuppressWarnings({"rawtypes", "unchecked"})
private void init()
{
//clear content type
httpResponse.setContentType(null);
if (!initialized) {
Map<Class<?>, Class<? extends StringCodec<?>>> codecs = dagManager.getApplicationAttributes().get(DAGContext.STRING_CODECS);
StringCodecs.loadConverters(codecs);
if (codecs != null) {
SimpleModule sm = new SimpleModule("DTSerializationModule", new Version(1, 0, 0, null));
for (Map.Entry<Class<?>, Class<? extends StringCodec<?>>> entry : codecs.entrySet()) {
try {
final StringCodec<Object> codec = (StringCodec<Object>)entry.getValue().newInstance();
sm.addSerializer(new SerializerBase(entry.getKey())
{
@Override
public void serialize(Object value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException
{
jgen.writeString(codec.toString(value));
}
});
} catch (Exception ex) {
LOG.error("Caught exception when instantiating codec for class {}", entry.getKey().getName(), ex);
}
}
objectMapper.registerModule(sm);
}
initialized = true;
}
}
void checkAccess(HttpServletRequest request)
{
if (!hasAccess(request)) {
throw new SecurityException();
}
}
@GET
@Produces(MediaType.APPLICATION_JSON)
public JSONObject get() throws Exception
{
return getAppInfo();
}
@GET
@Path(PATH_INFO)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getAppInfo() throws Exception
{
init();
return new JSONObject(objectMapper.writeValueAsString(new AppInfo(this.appCtx)));
}
@GET
@Path(PATH_PHYSICAL_PLAN)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getPhysicalPlan() throws Exception
{
init();
Map<String, Object> result = new HashMap<>();
result.put("operators", dagManager.getOperatorInfoList());
result.put("streams", dagManager.getStreamInfoList());
return new JSONObject(objectMapper.writeValueAsString(result));
}
@GET
@Path(PATH_PHYSICAL_PLAN_OPERATORS)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getOperatorsInfo() throws Exception
{
init();
OperatorsInfo nodeList = new OperatorsInfo();
nodeList.operators = dagManager.getOperatorInfoList();
// To get around the nasty JAXB problem for lists
return new JSONObject(objectMapper.writeValueAsString(nodeList));
}
@GET
@Path(PATH_PHYSICAL_PLAN_STREAMS)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getStreamsInfo() throws Exception
{
init();
StreamsInfo streamList = new StreamsInfo();
streamList.streams = dagManager.getStreamInfoList();
return new JSONObject(objectMapper.writeValueAsString(streamList));
}
@GET
@Path(PATH_PHYSICAL_PLAN_OPERATORS + "/{operatorId:\\d+}")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getOperatorInfo(@PathParam("operatorId") int operatorId) throws Exception
{
init();
OperatorInfo oi = dagManager.getOperatorInfo(operatorId);
if (oi == null) {
throw new NotFoundException();
}
return new JSONObject(objectMapper.writeValueAsString(oi));
}
@GET
@Path(PATH_PHYSICAL_PLAN_OPERATORS + "/{operatorId:\\d+}/ports")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getPortsInfo(@PathParam("operatorId") int operatorId) throws Exception
{
init();
Map<String, Object> map = new HashMap<>();
OperatorInfo oi = dagManager.getOperatorInfo(operatorId);
if (oi == null) {
throw new NotFoundException();
}
map.put("ports", oi.ports);
return new JSONObject(objectMapper.writeValueAsString(map));
}
@GET
@Path(PATH_PHYSICAL_PLAN_OPERATORS + "/{operatorId:\\d+}/ports/{portName}")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getPortsInfo(@PathParam("operatorId") int operatorId, @PathParam("portName") String portName) throws Exception
{
init();
OperatorInfo oi = dagManager.getOperatorInfo(operatorId);
if (oi == null) {
throw new NotFoundException();
}
for (PortInfo pi : oi.ports) {
if (pi.name.equals(portName)) {
return new JSONObject(objectMapper.writeValueAsString(pi));
}
}
throw new NotFoundException();
}
@GET
@Path(PATH_OPERATOR_CLASSES)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getOperatorClasses(@QueryParam("q") String searchTerm, @QueryParam("parent") String parent)
{
init();
JSONObject result = new JSONObject();
JSONArray classNames = new JSONArray();
if (parent != null) {
if (parent.equals("chart")) {
parent = "com.datatorrent.lib.chart.ChartOperator";
} else if (parent.equals("filter")) {
parent = "com.datatorrent.common.util.SimpleFilterOperator";
}
}
try {
Set<String> operatorClasses = operatorDiscoverer.getOperatorClasses(parent, searchTerm);
for (String clazz : operatorClasses) {
JSONObject j = new JSONObject();
j.put("name", clazz);
classNames.put(j);
}
result.put("operatorClasses", classNames);
} catch (ClassNotFoundException ex) {
throw new NotFoundException();
} catch (JSONException ex) {
throw new RuntimeException(ex);
}
return result;
}
@GET
@Path(PATH_OPERATOR_CLASSES + "/{className}")
@Produces(MediaType.APPLICATION_JSON)
@SuppressWarnings("unchecked")
public JSONObject describeOperator(@PathParam("className") String className)
{
init();
if (className == null) {
throw new UnsupportedOperationException();
}
try {
Class<?> clazz = Class.forName(className);
if (Operator.class.isAssignableFrom(clazz)) {
return operatorDiscoverer.describeOperator(className);
} else {
throw new NotFoundException();
}
} catch (Exception ex) {
throw new NotFoundException();
}
}
@POST // not supported by WebAppProxyServlet, can only be called directly
@Path(PATH_SHUTDOWN)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject shutdown()
{
init();
LOG.debug("Shutdown requested");
dagManager.shutdownAllContainers("Shutdown requested externally.");
return new JSONObject();
}
private static String getTupleRecordingId()
{
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
String val = sdf.format(new Date());
val += "-";
byte[] r = new byte[4];
new Random().nextBytes(r);
val += Base64.encodeBase64URLSafeString(r);
return val;
}
@POST
@Path(PATH_PHYSICAL_PLAN_OPERATORS + "/{opId:\\d+}/" + PATH_RECORDINGS_START)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject startRecording(@PathParam("opId") int opId, String content) throws JSONException
{
init();
LOG.debug("Start recording on {} requested", opId);
JSONObject response = new JSONObject();
long numWindows = 0;
if (StringUtils.isNotBlank(content)) {
JSONObject r = new JSONObject(content);
numWindows = r.optLong("numWindows", 0);
}
String id = getTupleRecordingId();
dagManager.startRecording(id, opId, null, numWindows);
response.put("id", id);
return response;
}
@POST
@Path(PATH_PHYSICAL_PLAN_OPERATORS + "/{opId:\\d+}/ports/{portName}/" + PATH_RECORDINGS_START)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject startRecording(@PathParam("opId") int opId, @PathParam("portName") String portName, String content) throws JSONException
{
init();
LOG.debug("Start recording on {}.{} requested", opId, portName);
JSONObject response = new JSONObject();
long numWindows = 0;
if (StringUtils.isNotBlank(content)) {
JSONObject r = new JSONObject(content);
numWindows = r.optLong("numWindows", 0);
}
String id = getTupleRecordingId();
dagManager.startRecording(id, opId, portName, numWindows);
response.put("id", id);
return response;
}
@POST
@Path(PATH_PHYSICAL_PLAN_OPERATORS + "/{opId:\\d+}/" + PATH_RECORDINGS_STOP)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject stopRecording(@PathParam("opId") int opId)
{
init();
LOG.debug("Start recording on {} requested", opId);
JSONObject response = new JSONObject();
dagManager.stopRecording(opId, null);
return response;
}
@POST
@Path(PATH_PHYSICAL_PLAN_OPERATORS + "/{opId:\\d+}/ports/{portName}/" + PATH_RECORDINGS_STOP)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject stopRecording(@PathParam("opId") int opId, @PathParam("portName") String portName)
{
init();
LOG.debug("Stop recording on {}.{} requested", opId, portName);
JSONObject response = new JSONObject();
dagManager.stopRecording(opId, portName);
return response;
}
@GET
@Path(PATH_PHYSICAL_PLAN_CONTAINERS)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject listContainers(@QueryParam("states") String states) throws Exception
{
init();
Set<String> stateSet = null;
if (states != null) {
stateSet = new HashSet<>();
stateSet.addAll(Arrays.asList(StringUtils.split(states, ',')));
}
ContainersInfo ci = new ContainersInfo();
for (ContainerInfo containerInfo : dagManager.getCompletedContainerInfo()) {
if (stateSet == null || stateSet.contains(containerInfo.state)) {
ci.add(containerInfo);
}
}
Collection<StreamingContainerAgent> containerAgents = dagManager.getContainerAgents();
// add itself (app master container)
ContainerInfo appMasterContainerInfo = dagManager.getAppMasterContainerInfo();
if (stateSet == null || stateSet.contains(appMasterContainerInfo.state)) {
ci.add(appMasterContainerInfo);
}
for (StreamingContainerAgent sca : containerAgents) {
ContainerInfo containerInfo = sca.getContainerInfo();
if (stateSet == null || stateSet.contains(containerInfo.state)) {
ci.add(containerInfo);
}
}
// To get around the nasty JAXB problem for lists
return new JSONObject(objectMapper.writeValueAsString(ci));
}
@GET
@Path(PATH_PHYSICAL_PLAN_CONTAINERS + "/{containerId}")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getContainer(@PathParam("containerId") String containerId) throws Exception
{
init();
ContainerInfo ci = null;
if (containerId.equals(System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString()))) {
ci = dagManager.getAppMasterContainerInfo();
} else {
for (ContainerInfo containerInfo : dagManager.getCompletedContainerInfo()) {
if (containerInfo.id.equals(containerId)) {
ci = containerInfo;
}
}
if (ci == null) {
StreamingContainerAgent sca = dagManager.getContainerAgent(containerId);
if (sca == null) {
throw new NotFoundException();
}
ci = sca.getContainerInfo();
}
}
return new JSONObject(objectMapper.writeValueAsString(ci));
}
@GET
@Path(PATH_PHYSICAL_PLAN_CONTAINERS + "/{containerId}/" + PATH_STACKTRACE)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getContainerStackTrace(@PathParam("containerId") String containerId) throws Exception
{
init();
if (containerId.equals(System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString()))) {
return StramUtils.getStackTrace();
}
StreamingContainerAgent sca = dagManager.getContainerAgent(containerId);
if (sca == null) {
throw new NotFoundException("Container not found.");
}
if (!sca.getContainerInfo().state.equals("ACTIVE")) {
throw new NotFoundException("Container is not active.");
}
for (int i = 0; i < STACK_TRACE_ATTEMPTS; ++i) {
String result = sca.getStackTrace();
if (result != null) {
return new JSONObject(result);
}
Thread.sleep(STACK_TRACE_WAIT_TIME);
}
throw new TimeoutException("Not able to get the stack trace");
}
@POST // not supported by WebAppProxyServlet, can only be called directly
@Path(PATH_PHYSICAL_PLAN_CONTAINERS + "/{containerId}/kill")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject killContainer(@PathParam("containerId") String containerId)
{
init();
JSONObject response = new JSONObject();
if (containerId.equals(System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString()))) {
LOG.info("Received a kill request on application master container. Exiting.");
new Thread()
{
@Override
public void run()
{
try {
Thread.sleep(3000);
System.exit(1);
} catch (InterruptedException ex) {
LOG.info("Received interrupt, aborting exit.");
}
}
}.start();
} else {
dagManager.stopContainer(containerId);
}
return response;
}
@GET
@Path(PATH_LOGICAL_PLAN_OPERATORS)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getLogicalOperators() throws Exception
{
init();
LogicalOperatorsInfo nodeList = new LogicalOperatorsInfo();
nodeList.operators = dagManager.getLogicalOperatorInfoList();
return new JSONObject(objectMapper.writeValueAsString(nodeList));
}
@GET
@Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getLogicalOperator(@PathParam("operatorName") String operatorName) throws Exception
{
init();
OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
if (logicalOperator == null) {
throw new NotFoundException();
}
LogicalOperatorInfo logicalOperatorInfo = dagManager.getLogicalOperatorInfo(operatorName);
return new JSONObject(objectMapper.writeValueAsString(logicalOperatorInfo));
}
@GET
@Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}/aggregation")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getOperatorAggregation(@PathParam("operatorName") String operatorName) throws Exception
{
init();
OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
if (logicalOperator == null) {
throw new NotFoundException();
}
OperatorAggregationInfo operatorAggregationInfo = dagManager.getOperatorAggregationInfo(operatorName);
return new JSONObject(objectMapper.writeValueAsString(operatorAggregationInfo));
}
@POST // not supported by WebAppProxyServlet, can only be called directly
@Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}/properties")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject setOperatorProperties(JSONObject request, @PathParam("operatorName") String operatorName)
{
init();
OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
if (logicalOperator == null) {
throw new NotFoundException();
}
JSONObject response = new JSONObject();
try {
@SuppressWarnings("unchecked")
Iterator<String> keys = request.keys();
while (keys.hasNext()) {
String key = keys.next();
String val = request.isNull(key) ? null : request.getString(key);
LOG.debug("Setting property for {}: {}={}", operatorName, key, val);
dagManager.setOperatorProperty(operatorName, key, val);
}
} catch (JSONException ex) {
LOG.warn("Got JSON Exception: ", ex);
} catch (Exception ex) {
LOG.error("Caught exception: ", ex);
throw new RuntimeException(ex);
}
return response;
}
@POST // not supported by WebAppProxyServlet, can only be called directly
@Path(PATH_PHYSICAL_PLAN_OPERATORS + "/{operatorId:\\d+}/properties")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject setPhysicalOperatorProperties(JSONObject request, @PathParam("operatorId") int operatorId)
{
init();
JSONObject response = new JSONObject();
try {
@SuppressWarnings("unchecked")
Iterator<String> keys = request.keys();
while (keys.hasNext()) {
String key = keys.next();
String val = request.isNull(key) ? null : request.getString(key);
dagManager.setPhysicalOperatorProperty(operatorId, key, val);
}
} catch (JSONException ex) {
LOG.warn("Got JSON Exception: ", ex);
}
return response;
}
@GET
@Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}/attributes")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getOperatorAttributes(@PathParam("operatorName") String operatorName, @QueryParam("attributeName") String attributeName)
{
init();
OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
if (logicalOperator == null) {
throw new NotFoundException();
}
HashMap<String, String> map = new HashMap<>();
for (Map.Entry<Attribute<?>, Object> entry : dagManager.getOperatorAttributes(operatorName).entrySet()) {
if (attributeName == null || entry.getKey().getSimpleName().equals(attributeName)) {
Map.Entry<Attribute<Object>, Object> entry1 = (Map.Entry<Attribute<Object>, Object>)(Map.Entry)entry;
map.put(entry1.getKey().getSimpleName(), entry1.getKey().codec.toString(entry1.getValue()));
}
}
return new JSONObject(map);
}
@GET
@Path(PATH_LOGICAL_PLAN + "/attributes")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getApplicationAttributes(@QueryParam("attributeName") String attributeName)
{
init();
HashMap<String, String> map = new HashMap<>();
for (Map.Entry<Attribute<?>, Object> entry : dagManager.getApplicationAttributes().entrySet()) {
if (attributeName == null || entry.getKey().getSimpleName().equals(attributeName)) {
Map.Entry<Attribute<Object>, Object> entry1 = (Map.Entry<Attribute<Object>, Object>)(Map.Entry)entry;
map.put(entry1.getKey().getSimpleName(), entry1.getKey().codec.toString(entry1.getValue()));
}
}
return new JSONObject(map);
}
@GET
@Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}/ports")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getPorts(@PathParam("operatorName") String operatorName)
{
init();
OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
Set<LogicalPlan.InputPortMeta> inputPorts;
Set<LogicalPlan.OutputPortMeta> outputPorts;
if (logicalOperator == null) {
ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName);
if (logicalModule == null) {
throw new NotFoundException();
}
inputPorts = logicalModule.getInputStreams().keySet();
outputPorts = logicalModule.getOutputStreams().keySet();
} else {
inputPorts = logicalOperator.getInputStreams().keySet();
outputPorts = logicalOperator.getOutputStreams().keySet();
}
JSONObject result = getPortsObjects(inputPorts, outputPorts);
return result;
}
private JSONObject getPortsObjects(Collection<LogicalPlan.InputPortMeta> inputs, Collection<LogicalPlan.OutputPortMeta> outputs)
{
JSONObject result = new JSONObject();
JSONArray ports = new JSONArray();
try {
for (LogicalPlan.InputPortMeta inputPort : inputs) {
JSONObject port = new JSONObject();
port.put("name", inputPort.getPortName());
port.put("type", "input");
ports.put(port);
}
for (LogicalPlan.OutputPortMeta outputPort : outputs) {
JSONObject port = new JSONObject();
port.put("name", outputPort.getPortName());
port.put("type", "output");
ports.put(port);
}
result.put("ports", ports);
} catch (JSONException ex) {
throw new RuntimeException(ex);
}
return result;
}
private JSONObject getPortObject(Collection<LogicalPlan.InputPortMeta> inputs,
Collection<LogicalPlan.OutputPortMeta> outputs,
String portName) throws JSONException
{
for (LogicalPlan.InputPortMeta inputPort : inputs) {
if (inputPort.getPortName().equals(portName)) {
JSONObject port = new JSONObject();
port.put("name", inputPort.getPortName());
port.put("type", "input");
return port;
}
}
for (LogicalPlan.OutputPortMeta outputPort : outputs) {
if (outputPort.getPortName().equals(portName)) {
JSONObject port = new JSONObject();
port.put("name", outputPort.getPortName());
port.put("type", "output");
return port;
}
}
return null;
}
@GET
@Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}/ports/{portName}")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getPort(@PathParam("operatorName") String operatorName, @PathParam("portName") String portName)
{
init();
OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
Set<LogicalPlan.InputPortMeta> inputPorts;
Set<LogicalPlan.OutputPortMeta> outputPorts;
if (logicalOperator == null) {
ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName);
if (logicalModule == null) {
throw new NotFoundException();
}
inputPorts = logicalModule.getInputStreams().keySet();
outputPorts = logicalModule.getOutputStreams().keySet();
} else {
inputPorts = logicalOperator.getInputStreams().keySet();
outputPorts = logicalOperator.getOutputStreams().keySet();
}
try {
JSONObject resp = getPortObject(inputPorts, outputPorts, portName);
if (resp != null) {
return resp;
}
} catch (JSONException ex) {
throw new RuntimeException(ex);
}
throw new NotFoundException();
}
@GET
@Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}/ports/{portName}/attributes")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getPortAttributes(@PathParam("operatorName") String operatorName, @PathParam("portName") String portName, @QueryParam("attributeName") String attributeName)
{
init();
OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
if (logicalOperator == null) {
throw new NotFoundException();
}
HashMap<String, String> map = new HashMap<>();
for (Map.Entry<Attribute<?>, Object> entry : dagManager.getPortAttributes(operatorName, portName).entrySet()) {
if (attributeName == null || entry.getKey().getSimpleName().equals(attributeName)) {
Map.Entry<Attribute<Object>, Object> entry1 = (Map.Entry<Attribute<Object>, Object>)(Map.Entry)entry;
map.put(entry1.getKey().getSimpleName(), entry1.getKey().codec.toString(entry1.getValue()));
}
}
return new JSONObject(map);
}
@GET
@Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}/properties")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getOperatorProperties(@PathParam("operatorName") String operatorName, @QueryParam("propertyName") String propertyName) throws IOException, JSONException
{
init();
OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
BeanMap operatorProperties = null;
if (logicalOperator == null) {
ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName);
if (logicalModule == null) {
throw new NotFoundException();
}
operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalModule.getModule());
} else {
operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator());
}
Map<String, Object> m = getPropertiesAsMap(propertyName, operatorProperties);
return new JSONObject(objectMapper.writeValueAsString(m));
}
private Map<String, Object> getPropertiesAsMap(@QueryParam("propertyName") String propertyName, BeanMap operatorProperties)
{
Map<String, Object> m = new HashMap<>();
@SuppressWarnings("rawtypes")
Iterator entryIterator = operatorProperties.entryIterator();
while (entryIterator.hasNext()) {
try {
@SuppressWarnings("unchecked")
Map.Entry<String, Object> entry = (Map.Entry<String, Object>)entryIterator.next();
if (propertyName == null) {
m.put(entry.getKey(), entry.getValue());
} else if (propertyName.equals(entry.getKey())) {
m.put(entry.getKey(), entry.getValue());
break;
}
} catch (Exception ex) {
LOG.warn("Caught exception", ex);
}
}
return m;
}
@GET
@Path(PATH_PHYSICAL_PLAN_OPERATORS + "/{operatorId:\\d+}/properties")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getPhysicalOperatorProperties(@PathParam("operatorId") int operatorId, @QueryParam("propertyName") String propertyName, @QueryParam("waitTime") long waitTime)
{
init();
if (waitTime == 0) {
waitTime = WAIT_TIME;
}
Future<?> future = dagManager.getPhysicalOperatorProperty(operatorId, propertyName, waitTime);
try {
Object object = future.get(waitTime, TimeUnit.MILLISECONDS);
if (object != null) {
return new JSONObject(new ObjectMapper().writeValueAsString(object));
}
} catch (Exception ex) {
LOG.warn("Caught exception", ex);
throw new RuntimeException(ex);
}
return new JSONObject();
}
@GET
@Path(PATH_LOGICAL_PLAN)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getLogicalPlan(@QueryParam("includeModules") String includeModules) throws JSONException, IOException
{
init();
return new JSONObject(objectMapper.writeValueAsString(LogicalPlanSerializer.convertToMap(
dagManager.getLogicalPlan(), includeModules != null)));
}
@POST // not supported by WebAppProxyServlet, can only be called directly
@Path(PATH_LOGICAL_PLAN)
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject logicalPlanModification(JSONObject request)
{
init();
JSONObject response = new JSONObject();
try {
JSONArray jsonArray = request.getJSONArray("requests");
List<LogicalPlanRequest> requests = new ArrayList<>();
for (int i = 0; i < jsonArray.length(); i++) {
JSONObject jsonObj = (JSONObject)jsonArray.get(i);
LogicalPlanRequest requestObj = (LogicalPlanRequest)Class.forName(LogicalPlanRequest.class.getPackage().getName() + "." + jsonObj.getString("requestType")).newInstance();
@SuppressWarnings("unchecked")
Map<String, String> properties = BeanUtils.describe(requestObj);
@SuppressWarnings("unchecked")
Iterator<String> keys = jsonObj.keys();
while (keys.hasNext()) {
String key = keys.next();
if (!key.equals("requestType")) {
properties.put(key, jsonObj.get(key).toString());
}
}
BeanUtils.populate(requestObj, properties);
requests.add(requestObj);
}
Future<?> fr = dagManager.logicalPlanModification(requests);
fr.get(3000, TimeUnit.MILLISECONDS);
} catch (Exception ex) {
LOG.error("Error processing plan change", ex);
try {
if (ex instanceof ExecutionException) {
response.put("error", ex.getCause().toString());
} else {
response.put("error", ex.toString());
}
} catch (Exception e) {
// ignore
}
}
return response;
}
@POST
@Path(PATH_LOGGERS)
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject setLoggersLevel(JSONObject request)
{
init();
JSONObject response = new JSONObject();
Map<String, String> targetChanges = Maps.newHashMap();
try {
@SuppressWarnings("unchecked")
JSONArray loggerArray = request.getJSONArray("loggers");
for (int i = 0; i < loggerArray.length(); i++) {
JSONObject loggerNode = loggerArray.getJSONObject(i);
String target = loggerNode.getString("target");
String level = loggerNode.getString("logLevel");
if (ConfigValidator.validateLoggersLevel(target, level)) {
LOG.info("changing logger level for {} to {}", target, level);
targetChanges.put(target, level);
} else {
LOG.warn("incorrect logger settings {}:{}", target, level);
}
}
if (!targetChanges.isEmpty()) {
dagManager.setLoggersLevel(Collections.unmodifiableMap(targetChanges));
//Changing the levels on Stram after sending the message to all containers.
DTLoggerFactory.getInstance().changeLoggersLevel(targetChanges);
}
} catch (JSONException ex) {
throw new RuntimeException(ex);
}
return response;
}
@GET
@Path(PATH_LOGGERS + "/search")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject searchLoggersLevel(@QueryParam("pattern") String pattern)
{
init();
JSONObject response = new JSONObject();
JSONArray loggersArray = new JSONArray();
try {
if (pattern != null) {
Map<String, String> matches = DTLoggerFactory.getInstance().getClassesMatching(pattern);
for (Map.Entry<String, String> match : matches.entrySet()) {
JSONObject node = new JSONObject();
node.put("name", match.getKey());
node.put("level", match.getValue());
loggersArray.put(node);
}
}
response.put("loggers", loggersArray);
} catch (JSONException ex) {
throw new RuntimeException(ex);
}
return response;
}
@GET
@Path(PATH_LOGGERS)
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getLoggerLevels() throws JSONException
{
init();
JSONObject response = new JSONObject();
JSONArray levelsArray = new JSONArray();
Map<String, String> currentLevels = DTLoggerFactory.getInstance().getPatternLevels();
for (Map.Entry<String, String> lvl : currentLevels.entrySet()) {
JSONObject node = new JSONObject();
node.put("target", lvl.getKey());
node.put("logLevel", lvl.getValue());
levelsArray.put(node);
}
response.put("loggers", levelsArray);
return response;
}
}