blob: 9c98ca6ac78b62668fc84f0cd0c48f14a60d04b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestSampleRecord {
@Test
public void testIntervalSampling() throws InitializationException {
final MockRecordParser readerService = new MockRecordParser();
final MockRecordWriter writerService = new MockRecordWriter("header", false);
final TestRunner runner = TestRunners.newTestRunner(SampleRecord.class);
runner.addControllerService("reader", readerService);
runner.enableControllerService(readerService);
runner.addControllerService("writer", writerService);
runner.enableControllerService(writerService);
runner.setProperty(SampleRecord.RECORD_READER_FACTORY, "reader");
runner.setProperty(SampleRecord.RECORD_WRITER_FACTORY, "writer");
runner.setProperty(SampleRecord.SAMPLING_STRATEGY, SampleRecord.INTERVAL_SAMPLING_KEY);
runner.assertNotValid();
runner.setProperty(SampleRecord.SAMPLING_INTERVAL, "4");
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
for (int i = 0; i < 100; i++) {
readerService.addRecord(UUID.randomUUID(), 48);
}
runner.enqueue("");
runner.run();
runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
MockFlowFile out = runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
out.assertAttributeEquals("record.count", "25");
runner.clearTransferState();
runner.setProperty(SampleRecord.SAMPLING_INTERVAL, "1");
runner.enqueue("");
runner.run();
runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
out = runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
out.assertAttributeEquals("record.count", "100");
}
@Test
public void testIntervalSamplingWithNoRecords() throws InitializationException {
final MockRecordParser readerService = new MockRecordParser();
final MockRecordWriter writerService = new MockRecordWriter("header", false);
final TestRunner runner = TestRunners.newTestRunner(SampleRecord.class);
runner.addControllerService("reader", readerService);
runner.enableControllerService(readerService);
runner.addControllerService("writer", writerService);
runner.enableControllerService(writerService);
runner.setProperty(SampleRecord.RECORD_READER_FACTORY, "reader");
runner.setProperty(SampleRecord.RECORD_WRITER_FACTORY, "writer");
runner.setProperty(SampleRecord.SAMPLING_STRATEGY, SampleRecord.INTERVAL_SAMPLING_KEY);
runner.assertNotValid();
runner.setProperty(SampleRecord.SAMPLING_INTERVAL, "0");
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
for (int i = 0; i < 100; i++) {
readerService.addRecord(UUID.randomUUID(), 48);
}
runner.enqueue("");
runner.run();
runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
out.assertAttributeEquals("record.count", "0");
}
@Test
public void testProbabilisticSamplingWithSeed() throws InitializationException {
final MockRecordParser readerService = new MockRecordParser();
final MockRecordWriter writerService = new MockRecordWriter("header", false);
final TestRunner runner = TestRunners.newTestRunner(SampleRecord.class);
runner.addControllerService("reader", readerService);
runner.enableControllerService(readerService);
runner.addControllerService("writer", writerService);
runner.enableControllerService(writerService);
runner.setProperty(SampleRecord.RECORD_READER_FACTORY, "reader");
runner.setProperty(SampleRecord.RECORD_WRITER_FACTORY, "writer");
runner.setProperty(SampleRecord.SAMPLING_STRATEGY, SampleRecord.PROBABILISTIC_SAMPLING_KEY);
runner.assertNotValid();
runner.setProperty(SampleRecord.SAMPLING_PROBABILITY, "10");
runner.setProperty(SampleRecord.RANDOM_SEED, "1");
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
Random randomAge = new Random();
for (int i = 0; i < 100; i++) {
readerService.addRecord(UUID.randomUUID(), randomAge.nextInt());
}
runner.enqueue("");
runner.run();
runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
final MockFlowFile first = runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
String firstContent = new String(runner.getContentAsByteArray(first));
runner.clearTransferState();
runner.enqueue("");
runner.run();
runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
final MockFlowFile second = runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
String secondContent = new String(runner.getContentAsByteArray(second));
assertEquals(firstContent, secondContent);
}
@Test
public void testProbabilisticSamplingWithAllRecords() throws InitializationException {
final MockRecordParser readerService = new MockRecordParser();
final MockRecordWriter writerService = new MockRecordWriter("header", false);
final TestRunner runner = TestRunners.newTestRunner(SampleRecord.class);
runner.addControllerService("reader", readerService);
runner.enableControllerService(readerService);
runner.addControllerService("writer", writerService);
runner.enableControllerService(writerService);
runner.setProperty(SampleRecord.RECORD_READER_FACTORY, "reader");
runner.setProperty(SampleRecord.RECORD_WRITER_FACTORY, "writer");
runner.setProperty(SampleRecord.SAMPLING_STRATEGY, SampleRecord.PROBABILISTIC_SAMPLING_KEY);
runner.assertNotValid();
runner.setProperty(SampleRecord.SAMPLING_PROBABILITY, "100");
RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType());
RecordField ageField = new RecordField("age", RecordFieldType.INT.getDataType());
readerService.addSchemaField(nameField);
readerService.addSchemaField(ageField);
List<RecordField> recordFieldList = Arrays.asList(nameField, ageField);
RecordSchema recordSchema = new SimpleRecordSchema(recordFieldList);
Random randomAge = new Random();
final List<Record> records = new ArrayList<>(100);
for (int i = 0; i < 100; i++) {
HashMap<String, Object> recordMap = new HashMap<>();
Object name = UUID.randomUUID();
Object age = randomAge.nextInt();
recordMap.put("name", name);
recordMap.put("age", age);
records.add(new MapRecord(recordSchema, recordMap));
readerService.addRecord(name, age);
}
runner.enqueue("");
runner.run();
runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
final MockFlowFile first = runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
String[] content = new String(runner.getContentAsByteArray(first)).split(".*\n");
for (int i = 0; i < content.length; i++) {
assertEquals(content[i], records.get(i).toString());
}
}
@Test
public void testProbabilisticSamplingWithNoRecords() throws InitializationException {
final MockRecordParser readerService = new MockRecordParser();
final MockRecordWriter writerService = new MockRecordWriter("header", false);
final TestRunner runner = TestRunners.newTestRunner(SampleRecord.class);
runner.addControllerService("reader", readerService);
runner.enableControllerService(readerService);
runner.addControllerService("writer", writerService);
runner.enableControllerService(writerService);
runner.setProperty(SampleRecord.RECORD_READER_FACTORY, "reader");
runner.setProperty(SampleRecord.RECORD_WRITER_FACTORY, "writer");
runner.setProperty(SampleRecord.SAMPLING_STRATEGY, SampleRecord.PROBABILISTIC_SAMPLING_KEY);
runner.assertNotValid();
runner.setProperty(SampleRecord.SAMPLING_PROBABILITY, "0");
RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType());
RecordField ageField = new RecordField("age", RecordFieldType.INT.getDataType());
readerService.addSchemaField(nameField);
readerService.addSchemaField(ageField);
Random randomAge = new Random();
for (int i = 0; i < 100; i++) {
readerService.addRecord(UUID.randomUUID(), randomAge.nextInt());
}
runner.enqueue("");
runner.run();
runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
final MockFlowFile first = runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
String content = new String(runner.getContentAsByteArray(first));
assertTrue(content.isEmpty());
}
@Test
public void testReservoirSampling() throws InitializationException {
final MockRecordParser readerService = new MockRecordParser();
final MockRecordWriter writerService = new MockRecordWriter("header", false);
final TestRunner runner = TestRunners.newTestRunner(SampleRecord.class);
runner.addControllerService("reader", readerService);
runner.enableControllerService(readerService);
runner.addControllerService("writer", writerService);
runner.enableControllerService(writerService);
runner.setProperty(SampleRecord.RECORD_READER_FACTORY, "reader");
runner.setProperty(SampleRecord.RECORD_WRITER_FACTORY, "writer");
runner.setProperty(SampleRecord.SAMPLING_STRATEGY, SampleRecord.RESERVOIR_SAMPLING_KEY);
runner.assertNotValid();
runner.setProperty(SampleRecord.RESERVOIR_SIZE, "10");
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
for (int i = 0; i < 100; i++) {
readerService.addRecord(UUID.randomUUID(), 48);
}
runner.enqueue("");
runner.run();
runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
out.assertAttributeEquals("record.count", "10");
}
@Test
public void testReservoirSamplingWithSeed() throws InitializationException {
final MockRecordParser readerService = new MockRecordParser();
final MockRecordWriter writerService = new MockRecordWriter("header", false);
final TestRunner runner = TestRunners.newTestRunner(SampleRecord.class);
runner.addControllerService("reader", readerService);
runner.enableControllerService(readerService);
runner.addControllerService("writer", writerService);
runner.enableControllerService(writerService);
runner.setProperty(SampleRecord.RECORD_READER_FACTORY, "reader");
runner.setProperty(SampleRecord.RECORD_WRITER_FACTORY, "writer");
runner.setProperty(SampleRecord.SAMPLING_STRATEGY, SampleRecord.RESERVOIR_SAMPLING_KEY);
runner.setProperty(SampleRecord.RANDOM_SEED, "1");
runner.assertNotValid();
runner.setProperty(SampleRecord.RESERVOIR_SIZE, "10");
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
for (int i = 0; i < 100; i++) {
readerService.addRecord(UUID.randomUUID(), 48);
}
runner.enqueue("");
runner.run();
runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
final MockFlowFile first = runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
String firstContent = new String(runner.getContentAsByteArray(first));
first.assertAttributeEquals("record.count", "10");
runner.clearTransferState();
runner.enqueue("");
runner.run();
runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
final MockFlowFile second = runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
String secondContent = new String(runner.getContentAsByteArray(second));
assertEquals(firstContent, secondContent);
}
}