blob: 4f6bbfa0bb971e95caffa6ce3b7b1b6c90daa9ae [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.debug;
import java.beans.BeanInfo;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.codehaus.jackson.JsonProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Sink;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.StringCodec;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.common.codec.JsonStreamCodec;
import com.datatorrent.common.util.ObjectMapperString;
import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.engine.WindowGenerator;
import com.datatorrent.stram.tuple.Tuple;
import com.datatorrent.stram.util.FSPartFileCollection;
import com.datatorrent.stram.util.SharedPubSubWebSocketClient;
import com.datatorrent.stram.util.SharedPubSubWebSocketClient.Handler;
/**
* <p>
* TupleRecorder class.</p>
*
* @since 0.3.2
*/
public class TupleRecorder
{
public static final String VERSION = "1.2";
private long totalTupleCount = 0;
private final HashMap<String, PortInfo> portMap = new HashMap<>(); // used for output portInfo <name, id> map
private final HashMap<String, PortCount> portCountMap = new HashMap<>(); // used for tupleCount of each port <name, count> map
private transient long currentWindowId = WindowGenerator.MIN_WINDOW_ID - 1;
private transient ArrayList<Range> windowIdRanges = new ArrayList<>();
private long startTime = Stats.INVALID_TIME_MILLIS;
private String id;
private final String appId;
private int nextPortIndex = 0;
private final HashMap<String, Sink<Object>> sinks = new HashMap<>();
private transient long endWindowTuplesProcessed = 0;
private transient StreamCodec<Object> streamCodec;
private int numSubscribers = 0;
private SharedPubSubWebSocketClient wsClient;
private String recordingNameTopic;
private long numWindows = Long.MAX_VALUE; // number of windows to record
private Runnable stopProcedure; // stop procedure to execute
private static final Logger logger = LoggerFactory.getLogger(TupleRecorder.class);
// If there are errors processing tuples, don't log an error for every tuple as it could overwhelm the logs.
// The property specifies the minumum number of tuples between two consecutive error log statements. Set it to zero to
// log every tuple error
private static long ERROR_LOG_GAP;
long lastLog = -1;
static {
ERROR_LOG_GAP = 10000L;
String property = System.getProperty("org.apache.apex.stram.tupleRecorder.errorLogGap");
if (property != null) {
try {
long value = Long.decode(property);
if (value < 0 ) {
logger.warn("Log gap should be greater than or equal to 0, setting to default");
} else {
ERROR_LOG_GAP = value;
}
} catch (Exception ex) {
logger.warn("Unable to parse the log gap property, setting to default", ex);
}
}
logger.debug("Log gap is {}", ERROR_LOG_GAP);
}
private final FSPartFileCollection storage = new FSPartFileCollection()
{
@Override
protected String getIndexExtraInfo()
{
if (windowIdRanges.isEmpty()) {
return null;
}
String str;
windowIdRanges.get(windowIdRanges.size() - 1).high = TupleRecorder.this.currentWindowId;
str = convertToString(windowIdRanges);
int i = 0;
str += ":";
StringBuilder countStr = new StringBuilder("{");
for (PortCount pc: portCountMap.values()) {
if (i != 0) {
countStr.append(",");
}
countStr.append("\"").append(pc.id).append("\":\"").append(pc.count).append("\"");
i++;
}
countStr.append("}");
str += countStr.length();
str += ":" + countStr.toString();
return str;
}
@Override
protected void resetIndexExtraInfo()
{
for (PortCount pc: portCountMap.values()) {
pc.count = 0;
}
windowIdRanges.clear();
}
};
public TupleRecorder(String id, String appId)
{
this.id = id;
this.appId = appId;
}
public FSPartFileCollection getStorage()
{
return storage;
}
public RecorderSink newSink(String key)
{
RecorderSink recorderSink = new RecorderSink(key);
sinks.put(key, recorderSink);
return recorderSink;
}
/**
* Sets the stream codec for serialization to write to the files
* The serialization method must not produce newlines.
* For serializations that produces binary, base64 is recommended.
*
* @param streamCodec
*/
public void setStreamCodec(StreamCodec<Object> streamCodec)
{
this.streamCodec = streamCodec;
}
public void setWebSocketClient(SharedPubSubWebSocketClient wsClient)
{
this.wsClient = wsClient;
}
public Map<String, PortInfo> getPortInfoMap()
{
return Collections.unmodifiableMap(portMap);
}
public long getTotalTupleCount()
{
return totalTupleCount;
}
public Map<String, Sink<Object>> getSinkMap()
{
return Collections.unmodifiableMap(sinks);
}
/**
* @param startTime the startTime to set
*/
public void setStartTime(long startTime)
{
if (this.startTime == Stats.INVALID_TIME_MILLIS) {
this.startTime = startTime;
} else {
throw new IllegalStateException("Tuple recorder has already started at " + this.startTime);
}
}
/* defined for json information */
public static class PortInfo
{
public String name;
public String streamName;
public String type;
public int id;
}
/* defined for written tuple count of each port recorded in index file */
public static class PortCount
{
public int id;
public long count;
}
public static class RecordInfo
{
public long startTime;
public String appId;
public Map<String, ObjectMapperString> properties = new HashMap<>();
}
public static class Range
{
public long low = -1;
public long high = -1;
public Range()
{
}
public Range(long low, long high)
{
this.low = low;
this.high = high;
}
@Override
public String toString()
{
return "[" + String.valueOf(low) + "," + String.valueOf(high) + "]";
}
}
public long getStartTime()
{
return startTime;
}
public String getId()
{
return id;
}
public void addInputPortInfo(String portName, String streamName)
{
PortInfo portInfo = new PortInfo();
portInfo.name = portName;
portInfo.streamName = streamName;
portInfo.type = "input";
portInfo.id = nextPortIndex++;
portMap.put(portName, portInfo);
PortCount pc = new PortCount();
pc.id = portInfo.id;
pc.count = 0;
portCountMap.put(portName, pc);
}
public void addOutputPortInfo(String portName, String streamName)
{
PortInfo portInfo = new PortInfo();
portInfo.name = portName;
portInfo.streamName = streamName;
portInfo.type = "output";
portInfo.id = nextPortIndex++;
portMap.put(portName, portInfo);
PortCount pc = new PortCount();
pc.id = portInfo.id;
pc.count = 0;
portCountMap.put(portName, pc);
}
public void teardown()
{
logger.info("Closing down tuple recorder.");
this.storage.teardown();
}
public void setup(Operator operator, Map<Class<?>, Class<? extends StringCodec<?>>> codecs)
{
try {
storage.setup();
setStartTime(System.currentTimeMillis());
if (id == null) {
id = String.valueOf(startTime);
}
ByteArrayOutputStream bos = new ByteArrayOutputStream();
bos.write((VERSION + "\n").getBytes());
RecordInfo recordInfo = new RecordInfo();
recordInfo.startTime = startTime;
recordInfo.appId = appId;
streamCodec = new JsonStreamCodec<>(codecs);
if (operator != null) {
BeanInfo beanInfo = Introspector.getBeanInfo(operator.getClass());
PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
for (PropertyDescriptor pd: propertyDescriptors) {
String name = pd.getName();
Method readMethod = pd.getReadMethod();
if (readMethod != null) {
readMethod.setAccessible(true);
try {
Slice f = streamCodec.toByteArray(readMethod.invoke(operator));
recordInfo.properties.put(name, new ObjectMapperString(f.stringValue()));
} catch (Throwable t) {
logger.warn("Cannot serialize property {} for operator {}", name, operator.getClass());
recordInfo.properties.put(name, null);
}
}
}
}
Slice f = streamCodec.toByteArray(recordInfo);
bos.write(f.buffer, f.offset, f.length);
bos.write("\n".getBytes());
for (PortInfo pi: portMap.values()) {
f = streamCodec.toByteArray(pi);
bos.write(f.buffer, f.offset, f.length);
bos.write("\n".getBytes());
}
storage.writeMetaData(bos.toByteArray());
if (wsClient != null) {
recordingNameTopic = "applications." + appId + ".tupleRecorder." + getStartTime();
setupWsClient();
}
} catch (Exception ex) {
logger.error("Trouble setting up tuple recorder", ex);
}
}
private void setupWsClient() throws ExecutionException, IOException, InterruptedException, TimeoutException
{
if (wsClient != null) {
wsClient.addHandler(recordingNameTopic, true, new Handler()
{
@Override
public void onMessage(String type, String topic, Object data)
{
numSubscribers = Integer.valueOf((String)data);
logger.info("Number of subscribers for recording started at {} is now {}", getStartTime(), numSubscribers);
}
@Override
public void onClose()
{
numSubscribers = 0;
}
});
}
}
public void beginWindow(long windowId)
{
if (this.currentWindowId != windowId) {
if (windowId != this.currentWindowId + 1) {
if (!windowIdRanges.isEmpty()) {
windowIdRanges.get(windowIdRanges.size() - 1).high = this.currentWindowId;
}
Range range = new Range();
range.low = windowId;
windowIdRanges.add(range);
}
if (windowIdRanges.isEmpty()) {
Range range = new Range();
range.low = windowId;
windowIdRanges.add(range);
}
this.currentWindowId = windowId;
endWindowTuplesProcessed = 0;
try {
storage.writeDataItem(("B:" + System.currentTimeMillis() + ":" + windowId + "\n").getBytes(), false);
} catch (IOException ex) {
logger.error(ex.toString());
}
}
}
public void endWindow()
{
if (++endWindowTuplesProcessed == portMap.size()) {
try {
storage.writeDataItem(("E:" + System.currentTimeMillis() + ":" + currentWindowId + "\n").getBytes(), false);
logger.debug("Got last end window tuple. Flushing...");
if (!storage.flushData() && wsClient != null) {
wsClient.publish(SharedPubSubWebSocketClient.LAST_INDEX_TOPIC_PREFIX + ".tuple." + storage.getBasePath(), storage.getLatestIndexLine());
}
} catch (IOException ex) {
logger.error("Exception caught in endWindow", ex);
}
}
if (stopProcedure != null && --numWindows <= 0) {
stopProcedure.run();
}
}
public void writeTuple(Object obj, String port)
{
++totalTupleCount;
if (windowIdRanges.isEmpty()) {
throw new RuntimeException("Data tuples received from tuple recorder before any BEGIN_WINDOW");
}
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Slice f = null;
try {
f = streamCodec.toByteArray(obj);
} catch (RuntimeException ex) {
checkLogTuple(ex, "save", obj);
return;
}
try {
PortInfo pi = portMap.get(port);
String str = "T:" + System.currentTimeMillis() + ":" + pi.id + ":" + f.length + ":";
bos.write(str.getBytes());
bos.write(f.buffer, f.offset, f.length);
bos.write("\n".getBytes());
PortCount pc = portCountMap.get(port);
pc.count++;
portCountMap.put(port, pc);
storage.writeDataItem(bos.toByteArray(), true);
//logger.debug("Writing tuple for port id {}", pi.id);
//fsOutput.hflush();
if (numSubscribers > 0) {
// this is not asynchronous. we need to fix this
publishTupleData(pi.id, obj);
}
} catch (Exception ex) {
logger.warn("Error saving tuple", ex);
}
}
public void writeControlTuple(Tuple tuple, String port)
{
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
PortInfo pi = portMap.get(port);
Slice f = streamCodec.toByteArray(tuple);
String str = "C:" + System.currentTimeMillis() + ":" + pi.id + ":" + f.length + ":";
bos.write(str.getBytes());
bos.write(f.buffer, f.offset, f.length);
bos.write("\n".getBytes());
storage.writeDataItem(bos.toByteArray(), false);
} catch (IOException ex) {
logger.error(ex.toString());
}
}
private static String convertToString(List<Range> ranges)
{
String result = "";
int i = 0;
for (Range range: ranges) {
if (i++ > 0) {
result += ",";
}
result += String.valueOf(range.low);
result += "-";
result += String.valueOf(range.high);
}
return result;
}
private void publishTupleData(int portId, Object obj)
{
try {
if (wsClient != null && wsClient.isConnectionOpen()) {
HashMap<String, Object> map = new HashMap<>();
map.put("portId", String.valueOf(portId));
map.put("windowId", currentWindowId);
map.put("tupleCount", totalTupleCount);
map.put("data", obj);
wsClient.publish(recordingNameTopic, map);
}
} catch (Exception ex) {
if (ex instanceof JsonProcessingException) {
checkLogTuple(ex, "publish", obj);
} else {
logger.warn("Error publishing tuple", ex);
}
}
}
private void checkLogTuple(Exception ex, String context, Object tuple)
{
if ((lastLog == -1) || (totalTupleCount - lastLog) >= ERROR_LOG_GAP) {
lastLog = totalTupleCount;
logger.warn("Error serializing during {} for tuple {} ", context, tuple, ex);
}
}
public void setNumWindows(long numWindows, Runnable stopProcedure)
{
this.numWindows = numWindows;
this.stopProcedure = stopProcedure;
}
public class RecorderSink implements Sink<Object>
{
private final String portName;
private int count;
public RecorderSink(String portName)
{
this.portName = portName;
}
@Override
public void put(Object payload)
{
// *** if it's not a control tuple, then (payload instanceof Tuple) returns false
// In other words, if it's a regular tuple emitted by operators (payload), payload
// is not an instance of Tuple (confusing... I know)
++count;
if (payload instanceof Tuple) {
Tuple tuple = (Tuple)payload;
MessageType messageType = tuple.getType();
if (messageType == MessageType.BEGIN_WINDOW) {
beginWindow(tuple.getWindowId());
}
writeControlTuple(tuple, portName);
if (messageType == MessageType.END_WINDOW) {
endWindow();
}
} else {
writeTuple(payload, portName);
}
}
@Override
public int getCount(boolean reset)
{
try {
return count;
} finally {
if (reset) {
count = 0;
}
}
}
}
}