blob: 2d577bbe1eb70ff4d46e0a6c141b4013bd38a211 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.griffin.core.metric;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang.StringUtils;
import org.apache.griffin.core.metric.model.MetricValue;
import org.apache.griffin.core.util.JsonUtil;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
@Component
public class MetricStoreImpl implements MetricStore {
private static final String INDEX = "griffin";
private static final String TYPE = "accuracy";
private RestClient client;
private HttpHeaders responseHeaders;
private String urlGet;
private String urlDelete;
private String urlPost;
private ObjectMapper mapper;
private String indexMetaData;
public MetricStoreImpl(@Value("${elasticsearch.host}") String host,
@Value("${elasticsearch.port}") int port,
@Value("${elasticsearch.scheme:http}") String scheme,
@Value("${elasticsearch.user:}") String user,
@Value("${elasticsearch.password:}") String password) {
HttpHost httpHost = new HttpHost(host, port, scheme);
RestClientBuilder builder = RestClient.builder(httpHost);
if (!user.isEmpty() && !password.isEmpty()) {
String encodedAuth = buildBasicAuthString(user, password);
Header[] requestHeaders = new Header[]{
new BasicHeader(org.apache.http.HttpHeaders.AUTHORIZATION,
encodedAuth)};
builder.setDefaultHeaders(requestHeaders);
}
this.client = builder.build();
HttpHeaders responseHeaders = new HttpHeaders();
responseHeaders.setContentType(MediaType.APPLICATION_JSON);
this.responseHeaders = responseHeaders;
String urlBase = String.format("/%s/%s", INDEX, TYPE);
this.urlGet = urlBase.concat("/_search?filter_path=hits.hits._source");
this.urlPost = urlBase.concat("/_bulk");
this.urlDelete = urlBase.concat("/_delete_by_query");
this.indexMetaData = String.format(
"{ \"index\" : { \"_index\" : " +
"\"%s\",\"_type\" : \"%s\" } }%n",
INDEX,
TYPE);
this.mapper = new ObjectMapper();
}
@Override
public List<MetricValue> getMetricValues(String metricName, int from,
int size, long tmst)
throws IOException {
HttpEntity entity = getHttpEntityForSearch(metricName, from, size,
tmst);
try {
Response response = client.performRequest("GET", urlGet,
Collections.emptyMap(), entity);
return getMetricValuesFromResponse(response);
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() == 404) {
return Collections.emptyList();
}
throw e;
}
}
private HttpEntity getHttpEntityForSearch(String metricName, int from, int
size, long tmst)
throws JsonProcessingException {
Map<String, Object> map = new HashMap<>();
Map<String, Object> queryParam = new HashMap<>();
Map<String, Object> termQuery = Collections.singletonMap("name.keyword",
metricName);
queryParam.put("filter", Collections.singletonMap("term", termQuery));
Map<String, Object> sortParam = Collections
.singletonMap("tmst", Collections.singletonMap("order",
"desc"));
map.put("query", Collections.singletonMap("bool", queryParam));
map.put("sort", sortParam);
map.put("from", from);
map.put("size", size);
return new NStringEntity(JsonUtil.toJson(map),
ContentType.APPLICATION_JSON);
}
private List<MetricValue> getMetricValuesFromResponse(Response response)
throws IOException {
List<MetricValue> metricValues = new ArrayList<>();
JsonNode jsonNode = mapper.readTree(EntityUtils.toString(response
.getEntity()));
if (jsonNode.hasNonNull("hits") && jsonNode.get("hits")
.hasNonNull("hits")) {
for (JsonNode node : jsonNode.get("hits").get("hits")) {
JsonNode sourceNode = node.get("_source");
Map<String, Object> value = JsonUtil.toEntity(
sourceNode.get("value").toString(),
new TypeReference<Map<String, Object>>() {});
Map<String, Object> meta = JsonUtil.toEntity(
Objects.toString(sourceNode.get("metadata"), null),
new TypeReference<Map<String, Object>>() {});
MetricValue metricValue = new MetricValue(
sourceNode.get("name").asText(),
Long.parseLong(sourceNode.get("tmst").asText()),
meta,
value);
metricValues.add(metricValue);
}
}
return metricValues;
}
@Override
public ResponseEntity<?> addMetricValues(List<MetricValue> metricValues)
throws IOException {
String bulkRequestBody = getBulkRequestBody(metricValues);
HttpEntity entity = new NStringEntity(bulkRequestBody,
ContentType.APPLICATION_JSON);
Response response = client.performRequest("POST", urlPost,
Collections.emptyMap(), entity);
return getResponseEntityFromResponse(response);
}
private String getBulkRequestBody(List<MetricValue> metricValues) throws
JsonProcessingException {
StringBuilder bulkRequestBody = new StringBuilder();
for (MetricValue metricValue : metricValues) {
bulkRequestBody.append(indexMetaData);
bulkRequestBody.append(JsonUtil.toJson(metricValue));
bulkRequestBody.append(System.lineSeparator());
}
return bulkRequestBody.toString();
}
@Override
public ResponseEntity<?> deleteMetricValues(String metricName) throws
IOException {
Map<String, Object> param = Collections.singletonMap("query",
Collections.singletonMap("term",
Collections.singletonMap("name.keyword", metricName)));
HttpEntity entity = new NStringEntity(
JsonUtil.toJson(param),
ContentType.APPLICATION_JSON);
Response response = client.performRequest("POST", urlDelete,
Collections.emptyMap(), entity);
return getResponseEntityFromResponse(response);
}
private ResponseEntity<?> getResponseEntityFromResponse(Response response)
throws IOException {
String body = EntityUtils.toString(response.getEntity());
HttpStatus status = HttpStatus.valueOf(response.getStatusLine()
.getStatusCode());
return new ResponseEntity<>(body, responseHeaders, status);
}
private static String buildBasicAuthString(String user, String password) {
String auth = user + ":" + password;
return String.format("Basic %s", Base64.getEncoder().encodeToString(
auth.getBytes()));
}
@Override
public MetricValue getMetric(String applicationId) throws IOException {
Response response = client.performRequest(
"GET", urlGet,
Collections.singletonMap(
"q", "metadata.applicationId:" + applicationId));
List<MetricValue> metricValues = getMetricValuesFromResponse(response);
return metricValues.get(0);
}
}