blob: c14769f3fb0bbdccac17645bea55436c174b80bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.influxdb;
import static org.junit.Assert.assertEquals;
import java.io.EOFException;
import java.net.SocketTimeoutException;
import java.util.List;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBIOException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestPutInfluxDB {
private TestRunner runner;
private PutInfluxDB mockPutInfluxDB;
@Before
public void setUp() throws Exception {
mockPutInfluxDB = new PutInfluxDB() {
@Override
protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) {
return null;
}
@Override
protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy,
String records) {
}
};
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.USERNAME, "user");
runner.setProperty(PutInfluxDB.PASSWORD, "password");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
}
@After
public void tearDown() throws Exception {
runner = null;
}
@Test
public void testDefaultValid() {
runner.assertValid();
}
@Test
public void testBlankDBUrl() {
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "");
runner.assertNotValid();
}
@Test
public void testEmptyDBName() {
runner.setProperty(PutInfluxDB.DB_NAME, "");
runner.assertNotValid();
}
@Test
public void testEmptyConnectionTimeout() {
runner.setProperty(PutInfluxDB.INFLUX_DB_CONNECTION_TIMEOUT, "");
runner.assertNotValid();
}
@Test
public void testEmptyUsername() {
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.PASSWORD, "password");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
runner.setProperty(PutInfluxDB.USERNAME, "");
runner.assertNotValid();
}
@Test
public void testEmptyPassword() {
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.USERNAME, "username");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
runner.setProperty(PutInfluxDB.PASSWORD, "");
runner.assertNotValid();
}
@Test
public void testPasswordEL() {
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setVariable("influxdb.password", "password");
runner.setProperty(PutInfluxDB.PASSWORD, "${influxdb.password}");
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.USERNAME, "username");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
}
@Test
public void testUsernameEL() {
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setVariable("influxdb.username", "username");
runner.setProperty(PutInfluxDB.PASSWORD, "password");
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.USERNAME, "${influxdb.username}");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
}
@Test
public void testCharsetUTF8() {
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.assertValid();
}
@Test
public void testEmptyConsistencyLevel() {
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL, "");
runner.assertNotValid();
}
@Test
public void testCharsetBlank() {
runner.setProperty(PutInfluxDB.CHARSET, "");
runner.assertNotValid();
}
@Test
public void testZeroMaxDocumentSize() {
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "0");
runner.assertNotValid();
}
@Test
public void testSizeGreaterThanThresholdUsingEL() {
runner.setVariable("max.record.size", "1 B");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "${max.record.size}");
runner.assertValid();
byte [] bytes = new byte[2];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 'a';
}
runner.enqueue(bytes);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_MAX_SIZE_EXCEEDED, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_MAX_SIZE_EXCEEDED);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"Max records size exceeded " + bytes.length);
}
@Test
public void testSizeGreaterThanThreshold() {
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 B");
runner.assertValid();
byte [] bytes = new byte[2];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 'a';
}
runner.enqueue(bytes);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_MAX_SIZE_EXCEEDED, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_MAX_SIZE_EXCEEDED);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"Max records size exceeded " + bytes.length);
}
@Test
public void testValidSingleMeasurement() {
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 MB");
runner.assertValid();
byte [] bytes = "test".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), null);
}
@Test
public void testWriteThrowsException() {
mockPutInfluxDB = new PutInfluxDB() {
@Override
protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy,
String records) {
throw new RuntimeException("WriteException");
}
};
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.USERNAME, "u1");
runner.setProperty(PutInfluxDB.PASSWORD, "p1");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
byte [] bytes = "test".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"WriteException");
}
@Test
public void testWriteThrowsIOException() {
mockPutInfluxDB = new PutInfluxDB() {
@Override
protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy,
String records) {
throw new InfluxDBIOException(new EOFException("EOFException"));
}
};
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.USERNAME, "u1");
runner.setProperty(PutInfluxDB.PASSWORD, "p1");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
byte [] bytes = "test".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"java.io.EOFException: EOFException");
}
@Test
public void testWriteThrowsSocketTimeoutException() {
mockPutInfluxDB = new PutInfluxDB() {
@Override
protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy,
String records) {
throw new InfluxDBIOException(new SocketTimeoutException("SocketTimeoutException"));
}
};
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.USERNAME, "u1");
runner.setProperty(PutInfluxDB.PASSWORD, "p1");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
byte [] bytes = "test".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_RETRY, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_RETRY);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"java.net.SocketTimeoutException: SocketTimeoutException");
}
@Test
public void testTriggerThrowsException() {
mockPutInfluxDB = new PutInfluxDB() {
@Override
protected InfluxDB getInfluxDB(ProcessContext context) {
throw new RuntimeException("testException");
}
};
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.USERNAME, "u1");
runner.setProperty(PutInfluxDB.PASSWORD, "p1");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
byte [] bytes = "test".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"testException");
}
@Test
public void testValidArrayMeasurement() {
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 MB");
runner.assertValid();
runner.enqueue("test rain=2\ntest rain=3".getBytes());
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), null);
}
@Test
public void testInvalidEmptySingleMeasurement() {
byte [] bytes = "".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), "Empty measurement size 0");
}
}