blob: d7b7d5ca71906e9d936075193ed9ddbf7cee7782 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.relp.handler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestRELPSocketChannelHandler {
private EventFactory<TestEvent> eventFactory;
private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> channelHandlerFactory;
private ByteBufferSource byteBufferSource;
private BlockingQueue<TestEvent> events;
private ComponentLog logger = Mockito.mock(ComponentLog.class);
private int maxConnections;
private SSLContext sslContext;
private Charset charset;
private ChannelDispatcher dispatcher;
@Before
public void setup() {
eventFactory = new TestEventHolderFactory();
channelHandlerFactory = new RELPSocketChannelHandlerFactory<>();
byteBufferSource = new ByteBufferPool(1, 4096);
events = new LinkedBlockingQueue<>();
logger = Mockito.mock(ComponentLog.class);
maxConnections = 1;
sslContext = null;
charset = StandardCharsets.UTF_8;
dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBufferSource, events, logger,
maxConnections, sslContext, charset);
}
@Test
public void testBasicHandling() throws IOException, InterruptedException {
final List<String> messages = new ArrayList<>();
messages.add("1 syslog 20 this is message 1234\n");
messages.add("2 syslog 22 this is message 456789\n");
messages.add("3 syslog 21 this is message ABCDE\n");
run(messages);
Assert.assertEquals(messages.size(), events.size());
boolean found1 = false;
boolean found2 = false;
boolean found3 = false;
TestEvent event;
while((event = events.poll()) != null) {
Map<String,String> metadata = event.metadata;
Assert.assertTrue(metadata.containsKey(RELPMetadata.TXNR_KEY));
final String txnr = metadata.get(RELPMetadata.TXNR_KEY);
if (txnr.equals("1")) {
found1 = true;
} else if (txnr.equals("2")) {
found2 = true;
} else if (txnr.equals("3")) {
found3 = true;
}
}
Assert.assertTrue(found1);
Assert.assertTrue(found2);
Assert.assertTrue(found3);
}
@Test
public void testLotsOfFrames() throws IOException, InterruptedException {
final String baseMessage = " syslog 19 this is message ";
final List<String> messages = new ArrayList<>();
for (int i=100; i < 1000; i++) {
messages.add(i + baseMessage + i + "\n");
}
run(messages);
Assert.assertEquals(messages.size(), events.size());
}
protected void run(List<String> messages) throws IOException, InterruptedException {
final ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
// starts the dispatcher listening on port 0 so it selects a random port
dispatcher.open(null, 0, 4096);
// starts a thread to run the dispatcher which will accept/read connections
Thread dispatcherThread = new Thread(dispatcher);
dispatcherThread.start();
// create a client connection to the port the dispatcher is listening on
final int realPort = dispatcher.getPort();
try (SocketChannel channel = SocketChannel.open()) {
channel.connect(new InetSocketAddress("localhost", realPort));
Thread.sleep(100);
// send the provided messages
for (int i=0; i < messages.size(); i++) {
buffer.clear();
buffer.put(messages.get(i).getBytes(charset));
buffer.flip();
while (buffer.hasRemaining()) {
channel.write(buffer);
}
Thread.sleep(1);
}
}
// wait up to 25 seconds to verify the responses
long timeout = 25000;
long startTime = System.currentTimeMillis();
while (events.size() < messages.size() && (System.currentTimeMillis() - startTime < timeout)) {
Thread.sleep(100);
}
// should have gotten an event for each message sent
Assert.assertEquals(messages.size(), events.size());
} finally {
// stop the dispatcher thread and ensure we shut down handler threads
dispatcher.close();
}
}
// Test event to produce from the data
private static class TestEvent implements Event<SocketChannel> {
private byte[] data;
private Map<String,String> metadata;
public TestEvent(byte[] data, Map<String, String> metadata) {
this.data = data;
this.metadata = metadata;
}
@Override
public String getSender() {
return metadata.get(EventFactory.SENDER_KEY);
}
@Override
public byte[] getData() {
return data;
}
@Override
public ChannelResponder<SocketChannel> getResponder() {
return null;
}
}
// Factory to create test events and send responses for testing
private static class TestEventHolderFactory implements EventFactory<TestEvent> {
@Override
public TestEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
return new TestEvent(data, metadata);
}
}
}