blob: 49998cfa008cf0864725223b5f916d891126e0f9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.debug;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.engine.util.PubSubWebSocketClientBuilder;
import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.InputPort;
import com.datatorrent.api.Operator.OutputPort;
import com.datatorrent.api.Sink;
import com.datatorrent.api.Stats.OperatorStats;
import com.datatorrent.api.Stats.OperatorStats.PortStats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StatsListener.OperatorRequest;
import com.datatorrent.api.StringCodec;
import com.datatorrent.stram.api.ContainerContext;
import com.datatorrent.stram.api.ContainerEvent.ContainerStatsEvent;
import com.datatorrent.stram.api.ContainerEvent.NodeActivationEvent;
import com.datatorrent.stram.api.ContainerEvent.NodeDeactivationEvent;
import com.datatorrent.stram.api.RequestFactory;
import com.datatorrent.stram.api.RequestFactory.RequestDelegate;
import com.datatorrent.stram.api.StramToNodeStartRecordingRequest;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StramToNodeRequest;
import com.datatorrent.stram.engine.Node;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.Operators.PortContextPair;
import com.datatorrent.stram.plan.logical.Operators.PortMappingDescriptor;
import com.datatorrent.stram.util.SharedPubSubWebSocketClient;
import net.engio.mbassy.listener.Handler;
/**
* <p>TupleRecorderCollection class.</p>
*
* @since 0.3.5
*/
public class TupleRecorderCollection extends HashMap<OperatorIdPortNamePair, TupleRecorder> implements Component<Context>
{
private int tupleRecordingPartFileSize;
private long tupleRecordingPartFileTimeMillis;
private String appPath;
private String appId;
private SharedPubSubWebSocketClient wsClient;
private Map<Class<?>, Class<? extends StringCodec<?>>> codecs;
public TupleRecorder getTupleRecorder(int operId, String portName)
{
//logger.debug("attempting to get tuple recorder for {} on {}", new OperatorIdPortNamePair(operId, portName),
// System.identityHashCode(this));
return get(new OperatorIdPortNamePair(operId, portName));
}
@Override
public void setup(Context ctx)
{
tupleRecordingPartFileSize = ctx.getValue(LogicalPlan.TUPLE_RECORDING_PART_FILE_SIZE);
tupleRecordingPartFileTimeMillis = ctx.getValue(LogicalPlan.TUPLE_RECORDING_PART_FILE_TIME_MILLIS);
appId = ctx.getValue(LogicalPlan.APPLICATION_ID);
appPath = ctx.getValue(LogicalPlan.APPLICATION_PATH);
codecs = ctx.getAttributes().get(Context.DAGContext.STRING_CODECS);
wsClient = new PubSubWebSocketClientBuilder().setContext(ctx).build();
RequestDelegateImpl impl = new RequestDelegateImpl();
RequestFactory rf = ctx.getValue(ContainerContext.REQUEST_FACTORY);
if (rf == null) {
logger.warn("No request factory defined, recording is disabled!");
} else {
rf.registerDelegate(StramToNodeRequest.RequestType.START_RECORDING, impl);
rf.registerDelegate(StramToNodeRequest.RequestType.STOP_RECORDING, impl);
rf.registerDelegate(StramToNodeRequest.RequestType.SYNC_RECORDING, impl);
}
}
@Override
public void teardown()
{
for (TupleRecorder entry : values()) {
entry.teardown();
}
if (wsClient != null) {
// SPOI-1328: clean up IO threads or else process won't exit
wsClient.teardown();
}
clear();
}
/**
* Return the name of the stream which is operatorId.portname.
*
* @param operatorId id of the operator to which the port belongs.
* @param portname name of port to which the stream is connected.
* @return Stream Id if connected, null otherwise.
*/
public final String getDeclaredStreamId(int operatorId, String portname)
{
return String.valueOf(operatorId).concat(Component.CONCAT_SEPARATOR).concat(portname);
}
private void startRecording(String id, final Node<?> node, int operatorId, final String portName, long numWindows)
{
PortMappingDescriptor descriptor = node.getPortMappingDescriptor();
OperatorIdPortNamePair operatorIdPortNamePair = new OperatorIdPortNamePair(operatorId, portName);
// check any recording conflict
boolean conflict = false;
if (containsKey(new OperatorIdPortNamePair(operatorId, null))) {
conflict = true;
} else if (portName == null) {
for (Map.Entry<String, PortContextPair<InputPort<?>>> entry : descriptor.inputPorts.entrySet()) {
if (containsKey(new OperatorIdPortNamePair(operatorId, entry.getKey()))) {
conflict = true;
break;
}
}
for (Map.Entry<String, PortContextPair<OutputPort<?>>> entry : descriptor.outputPorts.entrySet()) {
if (containsKey(new OperatorIdPortNamePair(operatorId, entry.getKey()))) {
conflict = true;
break;
}
}
} else {
if (containsKey(operatorIdPortNamePair)) {
conflict = true;
}
}
if (!conflict) {
logger.debug("Executing start recording request for {}", operatorIdPortNamePair);
if (wsClient != null) {
try {
wsClient.openConnection();
} catch (Exception e) {
logger.warn("Cannot establish websocket connection to uri {}", wsClient.getUri(), e);
}
}
TupleRecorder tupleRecorder = new TupleRecorder(id, appId);
tupleRecorder.setWebSocketClient(wsClient);
HashMap<String, Sink<Object>> sinkMap = new HashMap<>();
for (Map.Entry<String, PortContextPair<InputPort<?>>> entry : descriptor.inputPorts.entrySet()) {
String streamId = getDeclaredStreamId(operatorId, entry.getKey());
if (streamId == null) {
streamId = portName + "_implicit_stream";
}
if (entry.getValue().context != null && (portName == null || entry.getKey().equals(portName))) {
logger.debug("Adding recorder sink to input port {}, stream {}", entry.getKey(), streamId);
tupleRecorder.addInputPortInfo(entry.getKey(), streamId);
sinkMap.put(entry.getKey(), tupleRecorder.newSink(entry.getKey()));
}
}
for (Map.Entry<String, PortContextPair<OutputPort<?>>> entry : descriptor.outputPorts.entrySet()) {
String streamId = getDeclaredStreamId(operatorId, entry.getKey());
if (streamId == null) {
streamId = portName + "_implicit_stream";
}
if (portName == null || entry.getKey().equals(portName)) {
logger.debug("Adding recorder sink to output port {}, stream {}", entry.getKey(), streamId);
tupleRecorder.addOutputPortInfo(entry.getKey(), streamId);
sinkMap.put(entry.getKey(), tupleRecorder.newSink(entry.getKey()));
}
}
if (!sinkMap.isEmpty()) {
logger.debug("Started recording on {} through {}", operatorIdPortNamePair, System.identityHashCode(this));
String basePath = appPath + "/recordings/" + operatorId + "/" + tupleRecorder.getId();
tupleRecorder.getStorage().setBasePath(basePath);
tupleRecorder.getStorage().setBytesPerPartFile(tupleRecordingPartFileSize);
tupleRecorder.getStorage().setMillisPerPartFile(tupleRecordingPartFileTimeMillis);
node.addSinks(sinkMap);
tupleRecorder.setup(node.getOperator(), codecs);
put(operatorIdPortNamePair, tupleRecorder);
if (numWindows > 0) {
tupleRecorder.setNumWindows(numWindows, new Runnable()
{
@Override
public void run()
{
node.context.request(new OperatorRequest()
{
@Override
public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException
{
stopRecording(node, operatorId, portName);
return null;
}
});
}
});
}
} else {
logger.warn("Tuple recording request ignored because operator is not connected on the specified port.");
}
} else {
logger.error("Operator id {} is already being recorded.", operatorId);
}
}
private void stopRecording(Node<?> node, int operatorId, String portName)
{
OperatorIdPortNamePair operatorIdPortNamePair = new OperatorIdPortNamePair(operatorId, portName);
if (containsKey(operatorIdPortNamePair)) {
logger.debug("Executing stop recording request for {}", operatorIdPortNamePair);
TupleRecorder tupleRecorder = get(operatorIdPortNamePair);
if (tupleRecorder != null) {
node.removeSinks(tupleRecorder.getSinkMap());
tupleRecorder.teardown();
logger.debug("Stopped recording for {}", operatorIdPortNamePair);
remove(operatorIdPortNamePair);
}
} else if (portName == null) {
// this should be looked at again when we redesign how we handle recordings with ports in a cleaner way
Iterator<Map.Entry<OperatorIdPortNamePair, TupleRecorder>> iterator = entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<OperatorIdPortNamePair, TupleRecorder> entry = iterator.next();
if (operatorId == entry.getKey().operatorId) {
TupleRecorder tupleRecorder = entry.getValue();
if (tupleRecorder != null) {
node.removeSinks(tupleRecorder.getSinkMap());
tupleRecorder.teardown();
logger.debug("Stopped recording for operator/port {}", operatorIdPortNamePair);
iterator.remove();
}
}
}
} else {
logger.error("Operator/port {} is not being recorded.", operatorIdPortNamePair);
}
}
private void syncRecording(Node<?> node, int operatorId, String portName)
{
OperatorIdPortNamePair operatorIdPortNamePair = new OperatorIdPortNamePair(operatorId, portName);
if (containsKey(operatorIdPortNamePair)) {
logger.debug("Executing sync recording request for {}", operatorIdPortNamePair);
TupleRecorder tupleRecorder = get(operatorIdPortNamePair);
if (tupleRecorder != null) {
tupleRecorder.getStorage().requestSync();
logger.debug("Requested sync recording for operator/port {}", operatorIdPortNamePair);
}
} else if (portName == null) {
// this should be looked at again when we redesign how we handle recordings with ports in a cleaner way
Iterator<Map.Entry<OperatorIdPortNamePair, TupleRecorder>> iterator = entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<OperatorIdPortNamePair, TupleRecorder> entry = iterator.next();
if (operatorId == entry.getKey().operatorId) {
TupleRecorder tupleRecorder = entry.getValue();
if (tupleRecorder != null) {
tupleRecorder.getStorage().requestSync();
logger.debug("Requested sync recording for operator/port {}", operatorIdPortNamePair);
}
}
}
} else {
logger.error("(SYNC_RECORDING) Operator/port {} is not being recorded.", operatorIdPortNamePair);
}
}
@Handler
public void activated(NodeActivationEvent nae)
{
Node<?> node = nae.getNode();
if (node.context.getValue(OperatorContext.AUTO_RECORD)) {
startRecording(null, node, node.getId(), null, 0);
} else {
for (Map.Entry<String, PortContextPair<InputPort<?>>> entry : node.getPortMappingDescriptor().inputPorts.entrySet()) {
if (entry.getValue().context != null && entry.getValue().context.getValue(PortContext.AUTO_RECORD)) {
startRecording(null, node, node.getId(), entry.getKey(), 0);
}
}
for (Map.Entry<String, PortContextPair<OutputPort<?>>> entry : node.getPortMappingDescriptor().outputPorts.entrySet()) {
if (entry.getValue().context != null && entry.getValue().context.getValue(PortContext.AUTO_RECORD)) {
startRecording(null, node, node.getId(), entry.getKey(), 0);
}
}
}
}
@Handler
public void deactivated(NodeDeactivationEvent nde)
{
Node<?> node = nde.getNode();
stopRecording(node, node.getId(), null);
}
@Handler
public void collected(ContainerStatsEvent cse)
{
ContainerStats stats = cse.getContainerStats();
for (OperatorHeartbeat node : stats.operators) {
for (OperatorStats os : node.windowStats) {
if (os.inputPorts != null) {
for (PortStats ps : os.inputPorts) {
ps.recordingId = null;
}
}
if (os.outputPorts != null) {
for (PortStats ps : os.outputPorts) {
ps.recordingId = null;
}
}
}
}
for (OperatorHeartbeat node : stats.operators) {
String recordingId;
TupleRecorder tupleRecorder = get(new OperatorIdPortNamePair(node.nodeId, null));
if (tupleRecorder == null) {
recordingId = null;
for (Map.Entry<OperatorIdPortNamePair, TupleRecorder> entry : this.entrySet()) {
if (entry.getKey().operatorId == node.nodeId) {
for (OperatorStats os : node.windowStats) {
if (os.inputPorts != null) {
for (PortStats ps : os.inputPorts) {
if (ps.id.equals(entry.getKey().portName)) {
ps.recordingId = entry.getValue().getId();
break;
}
}
}
if (os.outputPorts != null) {
for (PortStats ps : os.outputPorts) {
if (ps.id.equals(entry.getKey().portName)) {
ps.recordingId = entry.getValue().getId();
break;
}
}
}
}
}
}
} else {
recordingId = tupleRecorder.getId();
}
for (OperatorStats os : node.windowStats) {
os.recordingId = recordingId;
}
}
}
private class RequestDelegateImpl implements RequestDelegate
{
@Override
public OperatorRequest getRequestExecutor(final Node<?> node, final StramToNodeRequest snr)
{
switch (snr.getRequestType()) {
case START_RECORDING:
return new OperatorRequest()
{
@Override
public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException
{
StramToNodeStartRecordingRequest r = (StramToNodeStartRecordingRequest)snr;
startRecording(r.getId(), node, operatorId, r.getPortName(), r.getNumWindows());
return null;
}
@Override
public String toString()
{
return "Start Recording";
}
};
case STOP_RECORDING:
return new OperatorRequest()
{
@Override
public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException
{
stopRecording(node, operatorId, snr.getPortName());
return null;
}
@Override
public String toString()
{
return "Stop Recording";
}
};
case SYNC_RECORDING:
return new OperatorRequest()
{
@Override
public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException
{
syncRecording(node, operatorId, snr.getPortName());
return null;
}
@Override
public String toString()
{
return "Recording Request";
}
};
default:
throw new UnsupportedOperationException("Unknown request type " + snr.requestType);
}
}
}
private static final long serialVersionUID = 201309112123L;
private static final Logger logger = LoggerFactory.getLogger(TupleRecorderCollection.class);
}