blob: dea87d97cfe6a5e481121c5738c3b252c0615af2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.apache.nifi.processors.standard.ControlRate.MAX_FLOW_FILES_PER_BATCH;
import static org.junit.Assert.assertEquals;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
public class TestControlRate {
@Test
public void testLimitExceededThenOtherLimitNotExceeded() {
// If we have flowfiles queued that have different values for the "Rate Controlled Attribute"
// and we encounter a FlowFile whose rate should be throttled, we should continue pulling other flowfiles
// whose rate does not need to be throttled.
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "3");
runner.setProperty(ControlRate.TIME_PERIOD, "1 min");
runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group");
final Map<String, String> group1 = Collections.singletonMap("group", "1");
final Map<String, String> group2 = Collections.singletonMap("group", "2");
for (int i = 0; i < 5; i++) {
runner.enqueue("test data", group1);
}
runner.enqueue("test data", group2);
// Run several times, just to allow the processor to terminate the first poll if it wishes to
runner.run();
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 4);
final List<MockFlowFile> output = runner.getFlowFilesForRelationship(ControlRate.REL_SUCCESS);
assertEquals(3L, output.stream().filter(ff -> ff.getAttribute("group").equals("1")).count());
assertEquals(1L, output.stream().filter(ff -> ff.getAttribute("group").equals("2")).count());
}
@Test
public void testFileCountRate() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "3");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.enqueue("test data 1");
runner.enqueue("test data 2");
runner.enqueue("test data 3");
runner.enqueue("test data 4");
runner.run(4, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 3);
runner.clearTransferState();
runner.run(50, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
// we have sent 3 files and after 1 second, we should be able to send the 4th
Thread.sleep(1100L);
runner.run();
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
runner.assertQueueEmpty();
}
@Test
public void testFileCountWithGrouping() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "2");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group");
createFlowFileWithGroup(runner, "one");
createFlowFileWithGroup(runner, "two");
createFlowFileWithGroup(runner, "one");
createFlowFileWithGroup(runner, "two");
createFlowFileWithGroup(runner, "one");
createFlowFileWithGroup(runner, "two");
runner.run(6, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 4);
runner.clearTransferState();
runner.run(50, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
// we have sent 2 files per group and after 1 second, we should be able to send the remaining 1 file per group
Thread.sleep(1100L);
runner.run(2);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
runner.assertQueueEmpty();
}
@Test
public void testDataSizeRate() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
runner.setProperty(ControlRate.MAX_RATE, "20 b");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.enqueue("testdata 1");
runner.enqueue("testdata 2");
runner.enqueue("testdata 3");
runner.enqueue("testdata 4");
runner.run(4, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
runner.clearTransferState();
runner.run(50, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
// we have sent 20 bytes and after 1 second, we should be able to send 20 more
Thread.sleep(1100L);
runner.run(2, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
runner.assertQueueEmpty();
}
@Test
public void testViaAttribute() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "count");
runner.setProperty(ControlRate.MAX_RATE, "20000");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
createFlowFile(runner, 1000);
createFlowFile(runner, 3000);
createFlowFile(runner, 5000);
createFlowFile(runner, 20000);
createFlowFile(runner, 1000);
runner.run(5, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 4);
runner.clearTransferState();
// at this point, we have sent through 29,000 but our max is 20,000 per second.
// After 1.45 seconds (29000 / 20000), we should be able to send another 20,000
runner.run(50, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
Thread.sleep(1200L);
// at this point, more than TIME_PERIOD 1.0 seconds but less than 1.45 seconds have passed
runner.run(50, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
Thread.sleep(600L);
// at this point, more than 1.45 seconds have passed, so we should be able to send another 20,000
runner.run();
runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueEmpty();
}
@Test
public void testBadAttributeRate() {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "count");
runner.setProperty(ControlRate.MAX_RATE, "20000");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
final Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("count", "bad string");
runner.enqueue(new byte[0], attributeMap);
runner.run();
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertTransferCount(ControlRate.REL_FAILURE, 1);
runner.assertQueueEmpty();
}
@Test
public void testBatchLimit() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "5555");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
final int TEST_FILE_COUNT = 1500;
for (int i = 0; i < TEST_FILE_COUNT; i++) {
runner.enqueue("test data " + i);
}
runner.run(1, false);
// after 1 run should have MAX_FLOW_FILES_PER_BATCH files transferred and remainder of TEST_FILE_COUNT in queue
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, MAX_FLOW_FILES_PER_BATCH);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
assertEquals(TEST_FILE_COUNT - MAX_FLOW_FILES_PER_BATCH, runner.getQueueSize().getObjectCount());
runner.run(1, false);
// after 2 runs should have TEST_FILE_COUNT files transferred and 0 in queue
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, TEST_FILE_COUNT);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueEmpty();
}
@Test
public void testNonExistingGroupAttribute() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "2");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group");
createFlowFileWithGroup(runner, "one");
createFlowFile(runner, 1); // no group set on this flow file
createFlowFileWithGroup(runner, "one");
createFlowFile(runner, 2); // no group set on this flow file
runner.run(4, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 4);
runner.assertQueueEmpty();
}
private void createFlowFile(final TestRunner runner, final int value) {
final Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("count", String.valueOf(value));
runner.enqueue(new byte[0], attributeMap);
}
private void createFlowFileWithGroup(final TestRunner runner, final String group) {
final Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("group", group);
runner.enqueue(new byte[0], attributeMap);
}
}