blob: 2ae0f0bc77b34cd4cdc5b369bb54bb1e71b7b44b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.queue.clustered.client.async.nio;
import org.apache.nifi.controller.MockFlowFileRecord;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.clustered.FlowFileContentAccess;
import org.apache.nifi.controller.queue.clustered.SimpleLimitThreshold;
import org.apache.nifi.controller.queue.clustered.client.StandardLoadBalanceFlowFileCodec;
import org.apache.nifi.controller.queue.clustered.client.async.TransactionFailureCallback;
import org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.zip.CRC32;
import java.util.zip.CheckedOutputStream;
import java.util.zip.Checksum;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestLoadBalanceSession {
private final TransactionFailureCallback NOP_FAILURE_CALLBACK = new TransactionFailureCallback() {
@Override
public void onTransactionFailed(final List<FlowFileRecord> flowFiles, final Exception cause, final TransactionPhase transactionPhase) {
}
@Override
public boolean isRebalanceOnFailure() {
return false;
}
};
private ByteArrayOutputStream received;
private ServerSocket serverSocket;
private int port;
@Before
public void setup() throws IOException {
received = new ByteArrayOutputStream();
serverSocket = new ServerSocket(0);
port = serverSocket.getLocalPort();
final Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try (final Socket socket = serverSocket.accept()) {
final InputStream in = socket.getInputStream();
int data;
socket.getOutputStream().write(LoadBalanceProtocolConstants.VERSION_ACCEPTED);
socket.getOutputStream().write(LoadBalanceProtocolConstants.SPACE_AVAILABLE);
socket.getOutputStream().write(LoadBalanceProtocolConstants.CONFIRM_CHECKSUM);
socket.getOutputStream().write(LoadBalanceProtocolConstants.CONFIRM_COMPLETE_TRANSACTION);
while ((data = in.read()) != -1) {
received.write(data);
}
} catch (IOException e) {
e.printStackTrace();
Assert.fail();
}
}
});
thread.setDaemon(true);
thread.start();
}
@After
public void shutdown() throws IOException {
serverSocket.close();
}
@Test(timeout = 10000)
public void testSunnyCase() throws InterruptedException, IOException {
final Queue<FlowFileRecord> flowFiles = new LinkedList<>();
final FlowFileRecord flowFile1 = new MockFlowFileRecord(5);
final FlowFileRecord flowFile2 = new MockFlowFileRecord(8);
flowFiles.offer(flowFile1);
flowFiles.offer(flowFile2);
final Map<FlowFileRecord, InputStream> contentMap = new HashMap<>();
contentMap.put(flowFile1, new ByteArrayInputStream("hello".getBytes()));
contentMap.put(flowFile2, new ByteArrayInputStream("good-bye".getBytes()));
final FlowFileContentAccess contentAccess = contentMap::get;
final RegisteredPartition partition = new RegisteredPartition("unit-test-connection", () -> false,
flowFiles::poll, NOP_FAILURE_CALLBACK, (ff, nodeId) -> {}, () -> LoadBalanceCompression.DO_NOT_COMPRESS, () -> true);
final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", port));
socketChannel.configureBlocking(false);
final PeerChannel peerChannel = new PeerChannel(socketChannel, null, "unit-test");
final LoadBalanceSession transaction = new LoadBalanceSession(partition, contentAccess, new StandardLoadBalanceFlowFileCodec(), peerChannel, 30000,
new SimpleLimitThreshold(100, 10_000_000));
Thread.sleep(100L);
while (transaction.communicate()) {
}
assertTrue(transaction.isComplete());
socketChannel.close();
final Checksum expectedChecksum = new CRC32();
final ByteArrayOutputStream expectedOut = new ByteArrayOutputStream();
expectedOut.write(1); // Protocol Version
final DataOutputStream expectedDos = new DataOutputStream(new CheckedOutputStream(expectedOut, expectedChecksum));
expectedDos.writeUTF("unit-test-connection");
expectedDos.write(LoadBalanceProtocolConstants.CHECK_SPACE);
expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES);
expectedDos.writeInt(76); // metadata length
expectedDos.writeInt(1); // 1 attribute
expectedDos.writeInt(4); // length of attribute
expectedDos.write("uuid".getBytes());
expectedDos.writeInt(flowFile1.getAttribute("uuid").length());
expectedDos.write(flowFile1.getAttribute("uuid").getBytes());
expectedDos.writeLong(flowFile1.getLineageStartDate()); // lineage start date
expectedDos.writeLong(flowFile1.getEntryDate()); // entry date
expectedDos.writeLong(flowFile1.getPenaltyExpirationMillis()); // penalty expiration time
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
expectedDos.writeInt(5);
expectedDos.write("hello".getBytes());
expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME);
expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES);
expectedDos.writeInt(76); // metadata length
expectedDos.writeInt(1); // 1 attribute
expectedDos.writeInt(4); // length of attribute
expectedDos.write("uuid".getBytes());
expectedDos.writeInt(flowFile2.getAttribute("uuid").length());
expectedDos.write(flowFile2.getAttribute("uuid").getBytes());
expectedDos.writeLong(flowFile2.getLineageStartDate()); // lineage start date
expectedDos.writeLong(flowFile2.getEntryDate()); // entry date
expectedDos.writeLong(flowFile2.getPenaltyExpirationMillis()); // penalty expiration time
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
expectedDos.writeInt(8);
expectedDos.write("good-bye".getBytes());
expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME);
expectedDos.write(LoadBalanceProtocolConstants.NO_MORE_FLOWFILES);
expectedDos.writeLong(expectedChecksum.getValue());
expectedDos.write(LoadBalanceProtocolConstants.COMPLETE_TRANSACTION);
final byte[] expectedSent = expectedOut.toByteArray();
while (received.size() < expectedSent.length) {
Thread.sleep(10L);
}
final byte[] dataSent = received.toByteArray();
assertArrayEquals(expectedSent, dataSent);
assertEquals(Arrays.asList(flowFile1, flowFile2), transaction.getFlowFilesSent());
}
@Test(timeout = 10000)
public void testLargeContent() throws InterruptedException, IOException {
final byte[] content = new byte[66000];
for (int i=0; i < 66000; i++) {
content[i] = 'A';
}
final Queue<FlowFileRecord> flowFiles = new LinkedList<>();
final FlowFileRecord flowFile1 = new MockFlowFileRecord(content.length);
flowFiles.offer(flowFile1);
final Map<FlowFileRecord, InputStream> contentMap = new HashMap<>();
contentMap.put(flowFile1, new ByteArrayInputStream(content));
final FlowFileContentAccess contentAccess = contentMap::get;
final RegisteredPartition partition = new RegisteredPartition("unit-test-connection", () -> false,
flowFiles::poll, NOP_FAILURE_CALLBACK, (ff, nodeId) -> {}, () -> LoadBalanceCompression.DO_NOT_COMPRESS, () -> true);
final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", port));
socketChannel.configureBlocking(false);
final PeerChannel peerChannel = new PeerChannel(socketChannel, null, "unit-test");
final LoadBalanceSession transaction = new LoadBalanceSession(partition, contentAccess, new StandardLoadBalanceFlowFileCodec(), peerChannel, 30000,
new SimpleLimitThreshold(100, 10_000_000));
Thread.sleep(100L);
while (transaction.communicate()) {
}
socketChannel.close();
final Checksum expectedChecksum = new CRC32();
final ByteArrayOutputStream expectedOut = new ByteArrayOutputStream();
expectedOut.write(1); // Protocol Version
final DataOutputStream expectedDos = new DataOutputStream(new CheckedOutputStream(expectedOut, expectedChecksum));
expectedDos.writeUTF("unit-test-connection");
expectedDos.write(LoadBalanceProtocolConstants.CHECK_SPACE);
expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES);
expectedDos.writeInt(76); // metadata length
expectedDos.writeInt(1); // 1 attribute
expectedDos.writeInt(4); // length of attribute
expectedDos.write("uuid".getBytes());
expectedDos.writeInt(flowFile1.getAttribute("uuid").length());
expectedDos.write(flowFile1.getAttribute("uuid").getBytes());
expectedDos.writeLong(flowFile1.getLineageStartDate()); // lineage start date
expectedDos.writeLong(flowFile1.getEntryDate()); // entry date
expectedDos.writeLong(flowFile1.getPenaltyExpirationMillis()); // penalty expiration time
// first data frame
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
expectedDos.writeInt(LoadBalanceSession.MAX_DATA_FRAME_SIZE);
expectedDos.write(Arrays.copyOfRange(content, 0, LoadBalanceSession.MAX_DATA_FRAME_SIZE));
// second data frame
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
expectedDos.writeInt(content.length - LoadBalanceSession.MAX_DATA_FRAME_SIZE);
expectedDos.write(Arrays.copyOfRange(content, LoadBalanceSession.MAX_DATA_FRAME_SIZE, content.length));
expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME);
expectedDos.write(LoadBalanceProtocolConstants.NO_MORE_FLOWFILES);
expectedDos.writeLong(expectedChecksum.getValue());
expectedDos.write(LoadBalanceProtocolConstants.COMPLETE_TRANSACTION);
final byte[] expectedSent = expectedOut.toByteArray();
while (received.size() < expectedSent.length) {
Thread.sleep(10L);
}
final byte[] dataSent = received.toByteArray();
assertArrayEquals(expectedSent, dataSent);
assertEquals(Arrays.asList(flowFile1), transaction.getFlowFilesSent());
}
}