blob: 325b951014571c4d47d5948bb62ab9ccf80b4152 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import javax.net.ssl.SSLContext;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.LogMessage;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestListenTCPRecord {
static final String SCHEMA_TEXT = "{\n" +
" \"name\": \"syslogRecord\",\n" +
" \"namespace\": \"nifi\",\n" +
" \"type\": \"record\",\n" +
" \"fields\": [\n" +
" { \"name\": \"timestamp\", \"type\": \"string\" },\n" +
" { \"name\": \"logsource\", \"type\": \"string\" },\n" +
" { \"name\": \"message\", \"type\": \"string\" }\n" +
" ]\n" +
"}";
static final String DATA = "[" +
"{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"}," +
"{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"}," +
"{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"}" +
"]";
private static final Logger LOGGER = LoggerFactory.getLogger(TestListenTCPRecord.class);
private static final long TEST_TIMEOUT = 30000;
private static final String LOCALHOST = "localhost";
private static final String SSL_CONTEXT_IDENTIFIER = SSLContextService.class.getName();
private static SSLContext keyStoreSslContext;
private static SSLContext trustStoreSslContext;
private TestRunner runner;
@BeforeClass
public static void configureServices() throws TlsException {
keyStoreSslContext = SslContextUtils.createKeyStoreSslContext();
trustStoreSslContext = SslContextUtils.createTrustStoreSslContext();
}
@Before
public void setup() throws InitializationException {
runner = TestRunners.newTestRunner(ListenTCPRecord.class);
final String readerId = "record-reader";
final RecordReaderFactory readerFactory = new JsonTreeReader();
runner.addControllerService(readerId, readerFactory);
runner.setProperty(readerFactory, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
runner.setProperty(readerFactory, SchemaAccessUtils.SCHEMA_TEXT, SCHEMA_TEXT);
runner.enableControllerService(readerFactory);
final String writerId = "record-writer";
final RecordSetWriterFactory writerFactory = new MockRecordWriter("timestamp, logsource, message");
runner.addControllerService(writerId, writerFactory);
runner.enableControllerService(writerFactory);
runner.setProperty(ListenTCPRecord.RECORD_READER, readerId);
runner.setProperty(ListenTCPRecord.RECORD_WRITER, writerId);
}
@Test
public void testCustomValidate() throws InitializationException {
runner.setProperty(ListenTCPRecord.PORT, "1");
runner.assertValid();
enableSslContextService(keyStoreSslContext);
runner.setProperty(ListenTCPRecord.CLIENT_AUTH, "");
runner.assertNotValid();
runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name());
runner.assertValid();
}
@Test(timeout = TEST_TIMEOUT)
public void testRunOneRecordPerFlowFile() throws IOException, InterruptedException {
runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "1");
run(3, null);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
for (int i = 0; i < mockFlowFiles.size(); i++) {
final MockFlowFile flowFile = mockFlowFiles.get(i);
flowFile.assertAttributeEquals("record.count", "1");
final String content = new String(flowFile.toByteArray(), StandardCharsets.UTF_8);
Assert.assertNotNull(content);
Assert.assertTrue(content.contains("This is a test " + (i + 1)));
}
}
@Test(timeout = TEST_TIMEOUT)
public void testRunMultipleRecordsPerFlowFileLessThanBatchSize() throws IOException, InterruptedException {
runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "5");
run(1, null);
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
Assert.assertEquals(1, mockFlowFiles.size());
final MockFlowFile flowFile = mockFlowFiles.get(0);
flowFile.assertAttributeEquals("record.count", "3");
final String content = new String(flowFile.toByteArray(), StandardCharsets.UTF_8);
Assert.assertNotNull(content);
Assert.assertTrue(content.contains("This is a test " + 1));
Assert.assertTrue(content.contains("This is a test " + 2));
Assert.assertTrue(content.contains("This is a test " + 3));
}
@Test(timeout = TEST_TIMEOUT)
public void testRunClientAuthRequired() throws InitializationException, IOException, InterruptedException {
runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name());
enableSslContextService(keyStoreSslContext);
run(1, keyStoreSslContext);
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
Assert.assertEquals(1, mockFlowFiles.size());
final String content = new String(mockFlowFiles.get(0).toByteArray(), StandardCharsets.UTF_8);
Assert.assertNotNull(content);
Assert.assertTrue(content.contains("This is a test " + 1));
Assert.assertTrue(content.contains("This is a test " + 2));
Assert.assertTrue(content.contains("This is a test " + 3));
}
@Test(timeout = TEST_TIMEOUT)
public void testRunClientAuthNone() throws InitializationException, IOException, InterruptedException {
runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.NONE.name());
enableSslContextService(keyStoreSslContext);
run(1, trustStoreSslContext);
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
Assert.assertEquals(1, mockFlowFiles.size());
final String content = new String(mockFlowFiles.get(0).toByteArray(), StandardCharsets.UTF_8);
Assert.assertNotNull(content);
Assert.assertTrue(content.contains("This is a test " + 1));
Assert.assertTrue(content.contains("This is a test " + 2));
Assert.assertTrue(content.contains("This is a test " + 3));
}
protected void run(final int expectedTransferred, final SSLContext sslContext) throws IOException, InterruptedException {
final int port = NetworkUtils.availablePort();
runner.setProperty(ListenTCPRecord.PORT, Integer.toString(port));
// Run Processor and start listener without shutting down
runner.run(1, false, true);
final Thread thread = new Thread(() -> {
try (final Socket socket = getSocket(port, sslContext)) {
final OutputStream outputStream = socket.getOutputStream();
outputStream.write(DATA.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
} catch (final IOException e) {
LOGGER.error("Failed Sending Records to Port [{}]", port, e);
}
});
thread.start();
// Run Processor until success leveraging test method timeouts for failure status
int iterations = 0;
while (getSuccessCount() < expectedTransferred) {
runner.run(1, false, false);
iterations++;
final Optional<LogMessage> firstErrorMessage = runner.getLogger().getErrorMessages().stream().findFirst();
Assert.assertNull(firstErrorMessage.orElse(null));
}
LOGGER.info("Completed after iterations [{}]", iterations);
}
private int getSuccessCount() {
return runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS).size();
}
private Socket getSocket(final int port, final SSLContext sslContext) throws IOException {
final Socket socket;
if (sslContext == null) {
socket = new Socket(LOCALHOST, port);
} else {
socket = sslContext.getSocketFactory().createSocket(LOCALHOST, port);
}
return socket;
}
private void enableSslContextService(final SSLContext sslContext) throws InitializationException {
final RestrictedSSLContextService sslContextService = Mockito.mock(RestrictedSSLContextService.class);
Mockito.when(sslContextService.getIdentifier()).thenReturn(SSL_CONTEXT_IDENTIFIER);
Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
runner.addControllerService(SSL_CONTEXT_IDENTIFIER, sslContextService);
runner.enableControllerService(sslContextService);
runner.setProperty(ListenTCPRecord.SSL_CONTEXT_SERVICE, SSL_CONTEXT_IDENTIFIER);
}
}