| package org.skywalking.apm.collector.worker.noderef.persistence; |
| |
| import com.google.gson.JsonArray; |
| import com.google.gson.JsonObject; |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.Logger; |
| import org.elasticsearch.action.search.SearchRequestBuilder; |
| import org.elasticsearch.action.search.SearchResponse; |
| import org.elasticsearch.action.search.SearchType; |
| import org.elasticsearch.index.query.QueryBuilders; |
| import org.elasticsearch.search.aggregations.AggregationBuilders; |
| import org.elasticsearch.search.aggregations.bucket.terms.Terms; |
| import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; |
| import org.elasticsearch.search.aggregations.metrics.sum.Sum; |
| import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker; |
| import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider; |
| import org.skywalking.apm.collector.actor.ClusterWorkerContext; |
| import org.skywalking.apm.collector.actor.LocalWorkerContext; |
| import org.skywalking.apm.collector.actor.Role; |
| import org.skywalking.apm.collector.actor.WorkerException; |
| import org.skywalking.apm.collector.actor.selector.RollingSelector; |
| import org.skywalking.apm.collector.actor.selector.WorkerSelector; |
| import org.skywalking.apm.collector.worker.Const; |
| import org.skywalking.apm.collector.worker.TimeSlice; |
| import org.skywalking.apm.collector.worker.noderef.NodeRefResSumIndex; |
| import org.skywalking.apm.collector.worker.storage.EsClient; |
| |
| /** |
| * @author pengys5 |
| */ |
| public class NodeRefResSumSearchWithTimeSlice extends AbstractLocalSyncWorker { |
| |
| private Logger logger = LogManager.getFormatterLogger(NodeRefResSumSearchWithTimeSlice.class); |
| |
| NodeRefResSumSearchWithTimeSlice(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) { |
| super(role, clusterContext, selfContext); |
| } |
| |
| @Override |
| public void onWork(Object request, Object response) throws WorkerException { |
| if (request instanceof RequestEntity) { |
| RequestEntity search = (RequestEntity)request; |
| |
| SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeRefResSumIndex.INDEX); |
| searchRequestBuilder.setTypes(search.getSliceType()); |
| searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); |
| searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeRefResSumIndex.TIME_SLICE).gte(search.getStartTime()).lte(search.getEndTime())); |
| searchRequestBuilder.setSize(0); |
| |
| TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(NodeRefResSumIndex.AGG_COLUMN).field(NodeRefResSumIndex.AGG_COLUMN); |
| aggregationBuilder.subAggregation(AggregationBuilders.sum(NodeRefResSumIndex.ONE_SECOND_LESS).field(NodeRefResSumIndex.ONE_SECOND_LESS)); |
| aggregationBuilder.subAggregation(AggregationBuilders.sum(NodeRefResSumIndex.THREE_SECOND_LESS).field(NodeRefResSumIndex.THREE_SECOND_LESS)); |
| aggregationBuilder.subAggregation(AggregationBuilders.sum(NodeRefResSumIndex.FIVE_SECOND_LESS).field(NodeRefResSumIndex.FIVE_SECOND_LESS)); |
| aggregationBuilder.subAggregation(AggregationBuilders.sum(NodeRefResSumIndex.FIVE_SECOND_GREATER).field(NodeRefResSumIndex.FIVE_SECOND_GREATER)); |
| aggregationBuilder.subAggregation(AggregationBuilders.sum(NodeRefResSumIndex.ERROR).field(NodeRefResSumIndex.ERROR)); |
| aggregationBuilder.subAggregation(AggregationBuilders.sum(NodeRefResSumIndex.SUMMARY).field(NodeRefResSumIndex.SUMMARY)); |
| |
| searchRequestBuilder.addAggregation(aggregationBuilder); |
| |
| SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); |
| |
| JsonArray nodeRefResSumArray = new JsonArray(); |
| Terms aggTerms = searchResponse.getAggregations().get(NodeRefResSumIndex.AGG_COLUMN); |
| for (Terms.Bucket bucket : aggTerms.getBuckets()) { |
| String aggId = String.valueOf(bucket.getKey()); |
| Sum oneSecondLess = bucket.getAggregations().get(NodeRefResSumIndex.ONE_SECOND_LESS); |
| Sum threeSecondLess = bucket.getAggregations().get(NodeRefResSumIndex.THREE_SECOND_LESS); |
| Sum fiveSecondLess = bucket.getAggregations().get(NodeRefResSumIndex.FIVE_SECOND_LESS); |
| Sum fiveSecondGreater = bucket.getAggregations().get(NodeRefResSumIndex.FIVE_SECOND_GREATER); |
| Sum error = bucket.getAggregations().get(NodeRefResSumIndex.ERROR); |
| Sum summary = bucket.getAggregations().get(NodeRefResSumIndex.SUMMARY); |
| logger.debug("aggId: %s, oneSecondLess: %s, threeSecondLess: %s, fiveSecondLess: %s, fiveSecondGreater: %s, error: %s, summary: %s", aggId, |
| oneSecondLess.getValue(), threeSecondLess.getValue(), fiveSecondLess.getValue(), fiveSecondGreater.getValue(), error.getValue(), summary.getValue()); |
| |
| JsonObject nodeRefResSumObj = new JsonObject(); |
| String[] ids = aggId.split(Const.IDS_SPLIT); |
| String front = ids[0]; |
| String behind = ids[1]; |
| |
| nodeRefResSumObj.addProperty("front", front); |
| nodeRefResSumObj.addProperty("behind", behind); |
| |
| nodeRefResSumObj.addProperty(NodeRefResSumIndex.ONE_SECOND_LESS, oneSecondLess.getValue()); |
| nodeRefResSumObj.addProperty(NodeRefResSumIndex.THREE_SECOND_LESS, threeSecondLess.getValue()); |
| nodeRefResSumObj.addProperty(NodeRefResSumIndex.FIVE_SECOND_LESS, fiveSecondLess.getValue()); |
| nodeRefResSumObj.addProperty(NodeRefResSumIndex.FIVE_SECOND_GREATER, fiveSecondGreater.getValue()); |
| nodeRefResSumObj.addProperty(NodeRefResSumIndex.ERROR, error.getValue()); |
| nodeRefResSumObj.addProperty(NodeRefResSumIndex.SUMMARY, summary.getValue()); |
| nodeRefResSumArray.add(nodeRefResSumObj); |
| } |
| |
| JsonObject resJsonObj = (JsonObject)response; |
| resJsonObj.add("result", nodeRefResSumArray); |
| } else { |
| logger.error("unhandled message, message instance must NodeRefResSumSearchWithTimeSlice.RequestEntity, but is %s", request.getClass().toString()); |
| } |
| } |
| |
| public static class RequestEntity extends TimeSlice { |
| public RequestEntity(String sliceType, long startTime, long endTime) { |
| super(sliceType, startTime, endTime); |
| } |
| } |
| |
| public static class Factory extends AbstractLocalSyncWorkerProvider<NodeRefResSumSearchWithTimeSlice> { |
| @Override |
| public Role role() { |
| return WorkerRole.INSTANCE; |
| } |
| |
| @Override |
| public NodeRefResSumSearchWithTimeSlice workerInstance(ClusterWorkerContext clusterContext) { |
| return new NodeRefResSumSearchWithTimeSlice(role(), clusterContext, new LocalWorkerContext()); |
| } |
| } |
| |
| public enum WorkerRole implements Role { |
| INSTANCE; |
| |
| @Override |
| public String roleName() { |
| return NodeRefResSumSearchWithTimeSlice.class.getSimpleName(); |
| } |
| |
| @Override |
| public WorkerSelector workerSelector() { |
| return new RollingSelector(); |
| } |
| } |
| } |