blob: c694ab1c8faad73d15ef60ddc732a1bc6c4762f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
public class TestValidateCsv {
@Test
public void testHeaderAndSplit() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "true");
runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
runner.setProperty(ValidateCsv.SCHEMA, "Null, ParseDate(\"dd/MM/yyyy\"), Optional(ParseDouble())");
runner.enqueue("Name,Birthdate,Weight\nJohn,22/11/1954,63.2\nBob,01/03/2004,45.0");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("Name,Birthdate,Weight\nJohn,22/11/1954,63.2\nBob,01/03/2004,45.0");
runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
runner.clearTransferState();
runner.enqueue("Name,Birthdate,Weight\nJohn,22/11/1954,63a2\nBob,01/032004,45.0");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_VALID, 0);
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals("Name,Birthdate,Weight\nJohn,22/11/1954,63a2\nBob,01/032004,45.0");
runner.clearTransferState();
runner.enqueue("Name,Birthdate,Weight\nJohn,22/111954,63.2\nBob,01/03/2004,45.0");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("Name,Birthdate,Weight\nBob,01/03/2004,45.0");
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals("Name,Birthdate,Weight\nJohn,22/111954,63.2");
}
@Test
public void testNullValues() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "true");
runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
runner.setProperty(ValidateCsv.SCHEMA, "Null, Null, Null");
runner.enqueue("#Name,Birthdate,Weight\nJohn,\"\",63.2\nBob,,45.0");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("#Name,Birthdate,Weight\nJohn,\"\",63.2\nBob,,45.0");
runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
}
@Test
public void testUniqueWithSplit() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
runner.setProperty(ValidateCsv.SCHEMA, "Unique()");
runner.enqueue("John\r\nBob\r\nBob\r\nJohn\r\nTom");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("John\r\nBob\r\nTom");
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertAttributeEquals("count.total.lines", "5");
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertAttributeEquals("count.valid.lines", "3");
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals("Bob\r\nJohn");
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("count.invalid.lines", "2");
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("count.total.lines", "5");
}
@Test
public void testValidDateOptionalDouble() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.SCHEMA, "Null, ParseDate(\"dd/MM/yyyy\"), Optional(ParseDouble())");
runner.enqueue("John,22/11/1954,63.2\r\nBob,01/03/2004,45.0");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.enqueue("John,22/111954,63.2\r\nBob,01/03/2004,45.0");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("validation.error.message",
"'22/111954' could not be parsed as a Date");
}
@Test
public void testIsIncludedIn() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.SCHEMA, "Null, ParseDate(\"dd/MM/yyyy\"), IsIncludedIn(\"male\", \"female\")");
runner.enqueue("John,22/11/1954,male\r\nMarie,01/03/2004,female");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.enqueue("John,22/111954,63.2\r\nBob,01/03/2004,45.0");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
}
@Test
public void testBigDecimalBoolCharIntLong() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "true");
runner.setProperty(ValidateCsv.SCHEMA, "ParseBigDecimal(), ParseBool(), ParseChar(), ParseInt(), ParseLong()");
runner.enqueue("bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.enqueue("bigdecimal,bool,char,integer,long\r\n10.0001,true,c,92147483647,92147483647");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
}
@Test
public void testEqualsNotNullStrNotNullOrEmpty() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.SCHEMA, "Equals(), NotNull(), StrNotNullOrEmpty()");
runner.enqueue("test,test,test\r\ntest,test,test");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.enqueue("test,test,test\r\ntset,test,test");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
}
@Test
public void testStrlenStrMinMaxStrRegex() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.SCHEMA, "Strlen(4), StrMinMax(3,5), StrRegex(\"[a-z0-9\\._]+@[a-z0-9\\.]+\")");
runner.enqueue("test,test,test@apache.org");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.enqueue("test,test,testapache.org");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("validation.error.message",
"'testapache.org' does not match the regular expression '[a-z0-9\\._]+@[a-z0-9\\.]+'");
}
@Test
public void testDMinMaxForbidSubStrLMinMax() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.SCHEMA, "DMinMax(10,100),LMinMax(10,100),ForbidSubStr(\"test\", \"tset\")");
runner.enqueue("50.001,50,hello");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.enqueue("10,10,testapache.org");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
}
@Test
public void testUnique() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.SCHEMA, "Unique(), UniqueHashCode()");
runner.enqueue("1,2\r\n3,4");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.enqueue("1,2\r\n1,4");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
}
@Test
public void testRequire() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
int hashcode = "test".hashCode();
runner.setProperty(ValidateCsv.SCHEMA, "RequireHashCode(" + hashcode + "), RequireSubStr(\"test\")");
runner.enqueue("test,test");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.enqueue("tset,tset");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
}
@Test
public void testValidate() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.assertNotValid();
// We
runner.setProperty(ValidateCsv.SCHEMA, "RequireSubString(\"test\")");
runner.assertNotValid();
runner.setProperty(ValidateCsv.SCHEMA, "''");
runner.assertNotValid();
runner.setProperty(ValidateCsv.SCHEMA, "\"\"");
runner.assertNotValid();
}
@Test
public void testValidateWithEL() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, "${comma}");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "${crlf}");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "${quote}");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.SCHEMA, "RequireSubString(\"test\")");
runner.assertNotValid();
runner.setProperty(ValidateCsv.SCHEMA, "''");
runner.assertNotValid();
runner.setProperty(ValidateCsv.SCHEMA, "\"\"");
runner.assertNotValid();
runner.setProperty(ValidateCsv.SCHEMA, "${schema}");
runner.assertValid();
int hashcode = "test".hashCode();
runner.setVariable("schema", "RequireHashCode(" + hashcode + "), RequireSubStr(\"test\")");
runner.setVariable("comma", ",");
runner.setVariable("quote", "\"");
runner.setVariable("crlf", "\r\n");
runner.enqueue("test,test");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
}
@Test
public void testParseSchemaCommaBoundary() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.SCHEMA, "Null(),Null");
runner.assertValid();
}
@Test
public void testMultipleRuns() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
runner.setProperty(ValidateCsv.SCHEMA, "Unique()");
runner.enqueue("John\r\nBob\r\nTom");
runner.enqueue("John\r\nBob\r\nTom");
runner.run(2);
runner.assertTransferCount(ValidateCsv.REL_VALID, 2);
runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
}
@Test
public void testEscapingLineByLine() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "true");
runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
final String row = "Header1,\"Header2,escaped\",Header3\r\nField1,\"Field2,escaped\",Field3";
runner.setProperty(ValidateCsv.SCHEMA, "ParseInt(),ParseInt(),ParseInt()");
runner.enqueue(row);
runner.run(1);
runner.assertTransferCount(ValidateCsv.REL_VALID, 0);
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals(row);
runner.clearTransferState();
runner.setProperty(ValidateCsv.SCHEMA, "null,null,null");
runner.enqueue(row);
runner.run(1);
runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals(row);
}
@Test
public void testQuote() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "true");
runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
runner.setProperty(ValidateCsv.SCHEMA, "NotNull(), NotNull(), NotNull()");
runner.enqueue("Header 1, Header 2, Header 3\n\"Content 1a, Content 1b\", Content 2, Content 3");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("Header 1, Header 2, Header 3\n\"Content 1a, Content 1b\", Content 2, Content 3");
runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
}
}