blob: 55d083f2412b28ce5af6e4dbcf0e21ae17330d53 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.configuration.LineEnding;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.StringNettyEventSenderFactory;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestListenSyslog {
private static final String PRIORITY = "34";
private static final String TIMESTAMP = "Jan 31 23:59:59";
private static final String HOST = "localhost.localdomain";
private static final String BODY = String.class.getName();
private static final String VALID_MESSAGE = String.format("<%s>%s %s %s", PRIORITY, TIMESTAMP, HOST, BODY);
private static final String MIME_TYPE = "text/plain";
private static final boolean STOP_ON_FINISH_DISABLED = false;
private static final boolean STOP_ON_FINISH_ENABLED = true;
private static final boolean INITIALIZE_DISABLED = false;
private static final String LOCALHOST_ADDRESS = "127.0.0.1";
private static final Duration SENDER_TIMEOUT = Duration.ofSeconds(15);
private static final Charset CHARSET = StandardCharsets.US_ASCII;
private TestRunner runner;
private ListenSyslog processor;
@Before
public void setRunner() {
processor = new ListenSyslog();
runner = TestRunners.newTestRunner(processor);
runner.setProperty(ListenSyslog.CHARSET, CHARSET.name());
}
@After
public void closeEventSender() {
processor.shutdownEventServer();
}
@Test
public void testUdpSslContextServiceInvalid() throws InitializationException {
runner.setProperty(ListenSyslog.PROTOCOL, TransportProtocol.UDP.toString());
final int port = NetworkUtils.getAvailableUdpPort();
runner.setProperty(ListenSyslog.PORT, Integer.toString(port));
final RestrictedSSLContextService sslContextService = mock(RestrictedSSLContextService.class);
final String identifier = RestrictedSSLContextService.class.getName();
when(sslContextService.getIdentifier()).thenReturn(identifier);
runner.addControllerService(identifier, sslContextService);
runner.enableControllerService(sslContextService);
runner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, identifier);
runner.assertNotValid();
}
@Test
public void testRunTcp() throws Exception {
final int port = NetworkUtils.getAvailableTcpPort();
final TransportProtocol protocol = TransportProtocol.TCP;
runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
runner.setProperty(ListenSyslog.PORT, Integer.toString(port));
assertSendSuccess(protocol, port);
}
@Test
public void testRunUdp() throws Exception {
final int port = NetworkUtils.getAvailableUdpPort();
final TransportProtocol protocol = TransportProtocol.UDP;
runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
runner.setProperty(ListenSyslog.PORT, Integer.toString(port));
assertSendSuccess(protocol, port);
}
@Test
public void testRunUdpBatch() throws Exception {
final int port = NetworkUtils.getAvailableUdpPort();
final TransportProtocol protocol = TransportProtocol.UDP;
runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
runner.setProperty(ListenSyslog.PORT, Integer.toString(port));
final String[] messages = new String[]{VALID_MESSAGE, VALID_MESSAGE};
runner.setProperty(ListenSyslog.MAX_BATCH_SIZE, Integer.toString(messages.length));
runner.setProperty(ListenSyslog.PARSE_MESSAGES, Boolean.FALSE.toString());
runner.run(1, STOP_ON_FINISH_DISABLED);
sendMessages(protocol, port, LineEnding.NONE, messages);
runner.run(1, STOP_ON_FINISH_ENABLED, INITIALIZE_DISABLED);
final List<MockFlowFile> successFlowFiles = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS);
assertEquals("Success FlowFiles not matched", 1, successFlowFiles.size());
final Long receivedCounter = runner.getCounterValue(ListenSyslog.RECEIVED_COUNTER);
assertEquals("Received Counter not matched", Long.valueOf(messages.length), receivedCounter);
final Long successCounter = runner.getCounterValue(ListenSyslog.SUCCESS_COUNTER);
assertEquals("Success Counter not matched", Long.valueOf(1), successCounter);
}
@Test
public void testRunUdpInvalid() throws Exception {
final int port = NetworkUtils.getAvailableUdpPort();
final TransportProtocol protocol = TransportProtocol.UDP;
runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
runner.setProperty(ListenSyslog.PORT, Integer.toString(port));
runner.run(1, STOP_ON_FINISH_DISABLED);
sendMessages(protocol, port, LineEnding.NONE, TIMESTAMP);
runner.run(1, STOP_ON_FINISH_ENABLED, INITIALIZE_DISABLED);
final List<MockFlowFile> invalidFlowFiles = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID);
assertEquals("Invalid FlowFiles not matched", 1, invalidFlowFiles.size());
final MockFlowFile flowFile = invalidFlowFiles.iterator().next();
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_SENDER.key(), LOCALHOST_ADDRESS);
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_PROTOCOL.key(), protocol.toString());
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_PORT.key(), Integer.toString(port));
final String content = flowFile.getContent();
assertEquals("FlowFile content not matched", TIMESTAMP, content);
}
private void assertSendSuccess(final TransportProtocol protocol, final int port) throws Exception {
runner.run(1, STOP_ON_FINISH_DISABLED);
sendMessages(protocol, port, LineEnding.UNIX, VALID_MESSAGE);
runner.run(1, STOP_ON_FINISH_ENABLED, INITIALIZE_DISABLED);
final List<MockFlowFile> invalidFlowFiles = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID);
assertTrue("Invalid FlowFiles found", invalidFlowFiles.isEmpty());
final List<MockFlowFile> successFlowFiles = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS);
assertEquals("Success FlowFiles not matched", 1, successFlowFiles.size());
final MockFlowFile flowFile = successFlowFiles.iterator().next();
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), MIME_TYPE);
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_SENDER.key(), LOCALHOST_ADDRESS);
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_PROTOCOL.key(), protocol.toString());
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_PORT.key(), Integer.toString(port));
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_HOSTNAME.key(), HOST);
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_BODY.key(), BODY);
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_PRIORITY.key(), PRIORITY);
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_TIMESTAMP.key(), TIMESTAMP);
flowFile.assertAttributeEquals(SyslogAttributes.SYSLOG_VALID.key(), Boolean.TRUE.toString());
flowFile.assertAttributeExists(SyslogAttributes.SYSLOG_FACILITY.key());
flowFile.assertAttributeExists(SyslogAttributes.SYSLOG_SEVERITY.key());
final Long receivedCounter = runner.getCounterValue(ListenSyslog.RECEIVED_COUNTER);
assertEquals("Received Counter not matched", Long.valueOf(1), receivedCounter);
final Long successCounter = runner.getCounterValue(ListenSyslog.SUCCESS_COUNTER);
assertEquals("Success Counter not matched", Long.valueOf(1), successCounter);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertFalse("Provenance Events not found", events.isEmpty());
final ProvenanceEventRecord eventRecord = events.iterator().next();
assertEquals(ProvenanceEventType.RECEIVE, eventRecord.getEventType());
final String transitUri = String.format("%s://%s:%d", protocol.toString().toLowerCase(), LOCALHOST_ADDRESS, port);
assertEquals("Provenance Transit URI not matched", transitUri, eventRecord.getTransitUri());
}
private void sendMessages(final TransportProtocol protocol, final int port, final LineEnding lineEnding, final String... messages) throws Exception {
final StringNettyEventSenderFactory eventSenderFactory = new StringNettyEventSenderFactory(runner.getLogger(), LOCALHOST_ADDRESS, port, protocol, CHARSET, lineEnding);
eventSenderFactory.setTimeout(SENDER_TIMEOUT);
try (final EventSender<String> eventSender = eventSenderFactory.getEventSender()) {
for (final String message : messages) {
eventSender.sendEvent(message);
}
}
}
}