blob: aa9cc6208909f4e112363e1f4ef35d6f02eb214c [file] [log] [blame]
package com.ai.cloud.skywalking.analysis.chainbuild;
import com.ai.cloud.skywalking.analysis.chainbuild.entity.CallChainTree;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.chainbuild.po.SpecificTimeCallTreeMergedChainIdContainer;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;
import com.google.gson.Gson;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
public class ChainBuildReducer extends Reducer<Text, Text, Text, IntWritable> {
private Logger logger = LoggerFactory.getLogger(ChainBuildReducer.class);
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
ConfigInitializer.initialize();
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
doReduceAction(Bytes.toString(key.getBytes()), values.iterator());
}
public void doReduceAction(String key, Iterator<Text> chainInfoIterator)
throws IOException, InterruptedException {
CallChainTree chainTree = CallChainTree.load(key);
SpecificTimeCallTreeMergedChainIdContainer container = new SpecificTimeCallTreeMergedChainIdContainer(chainTree.getTreeToken());
while (chainInfoIterator.hasNext()) {
String callChainData = chainInfoIterator.next().toString();
ChainInfo chainInfo = null;
try {
chainInfo = new Gson().fromJson(callChainData, ChainInfo.class);
container.addMergedChainIfNotContain(chainInfo);
chainTree.summary(chainInfo);
} catch (Exception e) {
e.printStackTrace();
logger.error(
"Failed to summary call chain, maybe illegal data:"
+ callChainData, e);
}
}
container.saveToHBase();
chainTree.saveToHbase();
}
}