blob: 975858a47d3f56c01d03a6dcb170eb0e8948b27c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import java.util.List;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestDistributeLoad {
@BeforeClass
public static void before() {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DistributeLoad", "debug");
}
@Test
public void testDefaultRoundRobin() {
final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS, "100");
for (int i = 0; i < 101; i++) {
testRunner.enqueue(new byte[0]);
}
testRunner.run(101);
testRunner.assertTransferCount("1", 2);
for (int i = 2; i <= 100; i++) {
testRunner.assertTransferCount(String.valueOf(i), 1);
}
}
@Test
public void testWeightedRoundRobin() {
final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS, "100");
testRunner.setProperty("1", "5");
testRunner.setProperty("2", "3");
for (int i = 0; i < 106; i++) {
testRunner.enqueue(new byte[0]);
}
testRunner.run(108);
testRunner.assertTransferCount("1", 5);
testRunner.assertTransferCount("2", 3);
for (int i = 3; i <= 100; i++) {
testRunner.assertTransferCount(String.valueOf(i), 1);
}
}
@Test
public void testValidationOnAddedProperties() {
final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS, "100");
testRunner.setProperty("1", "5");
try {
testRunner.setProperty("1", "0");
Assert.fail("Allows property '1' to be set to '0'");
} catch (final AssertionError e) {
// expected behavior
}
try {
testRunner.setProperty("1", "-1");
Assert.fail("Allows property '1' to be set to '-1'");
} catch (final AssertionError e) {
// expected behavior
}
testRunner.setProperty("1", "101");
testRunner.setProperty("100", "5");
try {
testRunner.setProperty("101", "5");
Assert.fail("Allows property '101' to be set to '5'");
} catch (final AssertionError e) {
// expected behavior
}
try {
testRunner.setProperty("0", "5");
Assert.fail("Allows property '0' to be set to '5'");
} catch (final AssertionError e) {
// expected behavior
}
try {
testRunner.setProperty("-1", "5");
Assert.fail("Allows property '-1' to be set to '5'");
} catch (final AssertionError e) {
// expected behavior
}
}
@Test
public void testNextAvailable() {
final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS.getName(), "100");
testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY.getName(), DistributeLoad.STRATEGY_NEXT_AVAILABLE);
for (int i = 0; i < 99; i++) {
testRunner.enqueue(new byte[0]);
}
testRunner.setRelationshipUnavailable("50");
testRunner.run(101);
testRunner.assertQueueEmpty();
for (int i = 1; i <= 100; i++) {
System.out.println(i);
testRunner.assertTransferCount(String.valueOf(i), (i == 50) ? 0 : 1);
}
}
@Test
public void testFlowFileAttributesAdded() {
final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS, "100");
testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY, DistributeLoad.STRATEGY_NEXT_AVAILABLE);
for (int i = 0; i < 100; i++) {
testRunner.enqueue(new byte[0]);
}
testRunner.run(101);
testRunner.assertQueueEmpty();
for (int i = 1; i <= 100; i++) {
testRunner.assertTransferCount(String.valueOf(i), 1);
final List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(String.valueOf(i));
assertEquals(1, flowFilesForRelationship.size());
final MockFlowFile mockFlowFile = flowFilesForRelationship.get(0);
assertEquals(String.valueOf(i), mockFlowFile.getAttribute(DistributeLoad.RELATIONSHIP_ATTRIBUTE));
}
}
}