blob: d2db51665736e8dcbf9112cfcc0c36359a7d1857 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.prometheus;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.ListRecordSet;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockControllerServiceInitializationContext;
import org.apache.nifi.util.MockPropertyValue;
import org.apache.nifi.util.MockVariableRegistry;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestPrometheusRecordSink {
private static final String portString = "7077";
@Test
public void testSendData() throws IOException, InitializationException {
PrometheusRecordSink sink = initTask();
List<RecordField> recordFields = Arrays.asList(
new RecordField("field1", RecordFieldType.INT.getDataType()),
new RecordField("field2", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)),
new RecordField("field3", RecordFieldType.STRING.getDataType())
);
RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
Map<String, Object> row1 = new HashMap<>();
row1.put("field1", 15);
row1.put("field2", BigDecimal.valueOf(12.34567D));
row1.put("field3", "Hello");
Map<String, Object> row2 = new HashMap<>();
row2.put("field1", 6);
row2.put("field2", BigDecimal.valueOf(0.1234567890123456789D));
row2.put("field3", "World!");
RecordSet recordSet = new ListRecordSet(recordSchema, Arrays.asList(
new MapRecord(recordSchema, row1),
new MapRecord(recordSchema, row2)
));
Map<String, String> attributes = new HashMap<>();
attributes.put("a", "Hello");
WriteResult writeResult = sink.sendData(recordSet, attributes, true);
assertNotNull(writeResult);
assertEquals(2, writeResult.getRecordCount());
assertEquals("Hello", writeResult.getAttributes().get("a"));
final String content = getMetrics();
assertTrue(content.contains("field1{field3=\"Hello\",} 15.0\nfield1{field3=\"World!\",} 6.0\n"));
assertTrue(content.contains("field2{field3=\"Hello\",} 12.34567\nfield2{field3=\"World!\",} 0.12345678901234568\n"));
try {
sink.onStopped();
} catch (Exception e) {
// Do nothing, just need to shut down the server before the next run
}
}
@Test
public void testTwoInstances() throws Exception {
PrometheusRecordSink sink1 = initTask();
assertThrows(ProcessException.class, this::initTask);
sink1.onStopped();
}
private String getMetrics() throws IOException {
URL url = new URL("http://localhost:" + portString + "/metrics");
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setRequestMethod("GET");
int status = con.getResponseCode();
assertEquals(HttpURLConnection.HTTP_OK, status);
HttpClient client = HttpClientBuilder.create().build();
HttpGet request = new HttpGet("http://localhost:" + portString + "/metrics");
HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
return EntityUtils.toString(entity);
}
private PrometheusRecordSink initTask() throws InitializationException {
final ComponentLog logger = mock(ComponentLog.class);
final PrometheusRecordSink task = new PrometheusRecordSink();
ConfigurationContext context = mock(ConfigurationContext.class);
final StateManager stateManager = new MockStateManager(task);
final MockVariableRegistry variableRegistry = new MockVariableRegistry();
final PropertyValue pValue = mock(StandardPropertyValue.class);
variableRegistry.setVariable(new VariableDescriptor("port"), portString);
when(context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT)).thenReturn(new MockPropertyValue("${port}", null, variableRegistry));
when(context.getProperty(PrometheusRecordSink.SSL_CONTEXT)).thenReturn(pValue);
when(pValue.asControllerService(SSLContextService.class)).thenReturn(null);
final ControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(task, UUID.randomUUID().toString(), logger, stateManager);
task.initialize(initContext);
task.onScheduled(context);
return task;
}
}