blob: 1ccad68cb56fbf71a618bde0c1624d770f3cb793 [file] [log] [blame]
package org.skywalking.apm.collector.worker.noderef.persistence;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.Const;
import org.skywalking.apm.collector.worker.TimeSlice;
import org.skywalking.apm.collector.worker.noderef.NodeRefIndex;
import org.skywalking.apm.collector.worker.storage.EsClient;
/**
* @author pengys5
*/
public class NodeRefSearchWithTimeSlice extends AbstractLocalSyncWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefSearchWithTimeSlice.class);
NodeRefSearchWithTimeSlice(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void onWork(Object request, Object response) throws WorkerException {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity)request;
SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeRefIndex.INDEX);
searchRequestBuilder.setTypes(search.getSliceType());
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeRefIndex.TIME_SLICE).gte(search.getStartTime()).lte(search.getEndTime()));
searchRequestBuilder.setSize(0);
searchRequestBuilder.addAggregation(AggregationBuilders.terms(NodeRefIndex.AGG_COLUMN).field(NodeRefIndex.AGG_COLUMN).size(200));
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
Terms genders = searchResponse.getAggregations().get(NodeRefIndex.AGG_COLUMN);
JsonArray nodeRefArray = new JsonArray();
for (Terms.Bucket entry : genders.getBuckets()) {
String aggId = entry.getKeyAsString();
String[] aggIds = aggId.split(Const.IDS_SPLIT);
String front = aggIds[0];
String behind = aggIds[1];
JsonObject nodeRefObj = new JsonObject();
nodeRefObj.addProperty(NodeRefIndex.FRONT, front);
nodeRefObj.addProperty(NodeRefIndex.BEHIND, behind);
nodeRefArray.add(nodeRefObj);
}
logger.debug("node ref data: %s", nodeRefArray.toString());
JsonObject resJsonObj = (JsonObject)response;
resJsonObj.add("result", nodeRefArray);
} else {
logger.error("unhandled message, message instance must NodeRefSearchWithTimeSlice.RequestEntity, but is %s", request.getClass().toString());
}
}
public static class RequestEntity extends TimeSlice {
public RequestEntity(String sliceType, long startTime, long endTime) {
super(sliceType, startTime, endTime);
}
}
public static class Factory extends AbstractLocalSyncWorkerProvider<NodeRefSearchWithTimeSlice> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeRefSearchWithTimeSlice workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefSearchWithTimeSlice(role(), clusterContext, new LocalWorkerContext());
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefSearchWithTimeSlice.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}