blob: 137b9698dfaf3e082b2636391ffc3b73406e0e29 [file] [log] [blame]
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.worker.storage.RecordAnalysisData;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public abstract class RecordAnalysisMember extends AnalysisMember {
private Logger logger = LogManager.getFormatterLogger(RecordAnalysisMember.class);
private RecordAnalysisData recordAnalysisData = new RecordAnalysisData();
public RecordAnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
final public void set(String id, JsonObject record) throws Exception {
getRecordAnalysisData().getOrCreate(id).set(record);
}
private RecordAnalysisData getRecordAnalysisData() {
return recordAnalysisData;
}
@Override final protected void aggregation() throws Exception {
getRecordAnalysisData().asMap().forEach((key, value) -> {
try {
aggWorkRefs().tell(value);
} catch (Exception e) {
logger.error(e);
}
});
getRecordAnalysisData().asMap().clear();
}
protected abstract WorkerRefs aggWorkRefs();
}