blob: c07ced2392ba90ae93e560e405ccf283ec3eb809 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.queue.clustered;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.MockFlowFileRecord;
import org.apache.nifi.controller.MockSwapManager;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
import org.apache.nifi.controller.queue.NopConnectionEventListener;
import org.apache.nifi.controller.queue.clustered.client.StandardLoadBalanceFlowFileCodec;
import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient;
import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientFactory;
import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientRegistry;
import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientTask;
import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
import org.apache.nifi.controller.queue.clustered.partition.QueuePartition;
import org.apache.nifi.controller.queue.clustered.partition.RoundRobinPartitioner;
import org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer;
import org.apache.nifi.controller.queue.clustered.server.LoadBalanceAuthorizer;
import org.apache.nifi.controller.queue.clustered.server.LoadBalanceProtocol;
import org.apache.nifi.controller.queue.clustered.server.NotAuthorizedException;
import org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol;
import org.apache.nifi.controller.repository.ContentNotFoundException;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.RepositoryRecord;
import org.apache.nifi.controller.repository.RepositoryRecordType;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.security.util.TlsConfiguration;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import javax.net.ssl.SSLContext;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.GeneralSecurityException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class LoadBalancedQueueIT {
private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = (sslSocket) -> sslSocket == null ? null : "authorized.mydomain.com";
private final LoadBalanceAuthorizer NEVER_AUTHORIZED = (sslSocket) -> {
throw new NotAuthorizedException("Intentional Unit Test Failure - Not Authorized");
};
private final MockSwapManager flowFileSwapManager = new MockSwapManager();
private final String queueId = "unit-test";
private final EventReporter eventReporter = EventReporter.NO_OP;
private final int swapThreshold = 10_000;
private Set<NodeIdentifier> nodeIdentifiers;
private ClusterCoordinator clusterCoordinator;
private NodeIdentifier localNodeId;
private ProcessScheduler processScheduler;
private ResourceClaimManager resourceClaimManager;
private LoadBalancedFlowFileQueue serverQueue;
private FlowController flowController;
private ProvenanceRepository clientProvRepo;
private ContentRepository clientContentRepo;
private List<RepositoryRecord> clientRepoRecords;
private FlowFileRepository clientFlowFileRepo;
private ConcurrentMap<ContentClaim, byte[]> clientClaimContents;
private ProvenanceRepository serverProvRepo;
private List<RepositoryRecord> serverRepoRecords;
private FlowFileRepository serverFlowFileRepo;
private ConcurrentMap<ContentClaim, byte[]> serverClaimContents;
private ContentRepository serverContentRepo;
private SSLContext sslContext;
private final Set<ClusterTopologyEventListener> clusterEventListeners = Collections.synchronizedSet(new HashSet<>());
private final AtomicReference<LoadBalanceCompression> compressionReference = new AtomicReference<>();
@Before
public void setup() throws IOException, GeneralSecurityException {
compressionReference.set(LoadBalanceCompression.DO_NOT_COMPRESS);
nodeIdentifiers = new HashSet<>();
clusterCoordinator = mock(ClusterCoordinator.class);
when(clusterCoordinator.getNodeIdentifiers()).thenAnswer(invocation -> new HashSet<>(nodeIdentifiers));
when(clusterCoordinator.getLocalNodeIdentifier()).thenAnswer(invocation -> localNodeId);
when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenAnswer(invocation ->
new NodeConnectionStatus(invocation.getArgument(0, NodeIdentifier.class), NodeConnectionState.CONNECTED));
clusterEventListeners.clear();
doAnswer(new Answer() {
@Override
public Object answer(final InvocationOnMock invocation) {
clusterEventListeners.add(invocation.getArgument(0));
return null;
}
}).when(clusterCoordinator).registerEventListener(any(ClusterTopologyEventListener.class));
processScheduler = mock(ProcessScheduler.class);
clientProvRepo = mock(ProvenanceRepository.class);
resourceClaimManager = new StandardResourceClaimManager();
final Connection connection = mock(Connection.class);
when(connection.getIdentifier()).thenReturn(queueId);
serverQueue = mock(LoadBalancedFlowFileQueue.class);
when(serverQueue.isFull()).thenReturn(false);
when(connection.getFlowFileQueue()).thenReturn(serverQueue);
doAnswer(invocation -> compressionReference.get()).when(serverQueue).getLoadBalanceCompression();
flowController = mock(FlowController.class);
final FlowManager flowManager = Mockito.mock(FlowManager.class);
when(flowManager.getConnection(Mockito.anyString())).thenReturn(connection);
when(flowController.getFlowManager()).thenReturn(flowManager);
// Create repos for the server
serverRepoRecords = Collections.synchronizedList(new ArrayList<>());
serverFlowFileRepo = createFlowFileRepository(serverRepoRecords);
serverClaimContents = new ConcurrentHashMap<>();
serverContentRepo = createContentRepository(serverClaimContents);
serverProvRepo = mock(ProvenanceRepository.class);
clientClaimContents = new ConcurrentHashMap<>();
clientContentRepo = createContentRepository(clientClaimContents);
clientRepoRecords = Collections.synchronizedList(new ArrayList<>());
clientFlowFileRepo = createFlowFileRepository(clientRepoRecords);
final TlsConfiguration tlsConfiguration = new TemporaryKeyStoreBuilder().build();
sslContext = SslContextFactory.createSslContext(tlsConfiguration);
}
private ContentClaim createContentClaim(final byte[] bytes) {
final ResourceClaim resourceClaim = mock(ResourceClaim.class);
when(resourceClaim.getContainer()).thenReturn("container");
when(resourceClaim.getSection()).thenReturn("section");
when(resourceClaim.getId()).thenReturn("identifier");
final ContentClaim contentClaim = mock(ContentClaim.class);
when(contentClaim.getResourceClaim()).thenReturn(resourceClaim);
if (bytes != null) {
clientClaimContents.put(contentClaim, bytes);
}
return contentClaim;
}
private NioAsyncLoadBalanceClientFactory createClientFactory(final SSLContext sslContext) {
final FlowFileContentAccess flowFileContentAccess = flowFile -> clientContentRepo.read(flowFile.getContentClaim());
return new NioAsyncLoadBalanceClientFactory(sslContext, 30000, flowFileContentAccess, eventReporter, new StandardLoadBalanceFlowFileCodec(), clusterCoordinator);
}
@Test(timeout = 20_000)
public void testNewNodeAdded() throws IOException, InterruptedException {
localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
nodeIdentifiers.add(localNodeId);
// Create the server
final int timeoutMillis = 1000;
final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
final SSLContext sslContext = null;
final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
clientRegistry.start();
final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
final Thread clientThread = new Thread(clientTask);
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
final int serverCount = 5;
final ConnectionLoadBalanceServer[] servers = new ConnectionLoadBalanceServer[serverCount];
try {
flowFileQueue.startLoadBalancing();
clientThread.start();
for (int i = 0; i < serverCount; i++) {
final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 8, loadBalanceProtocol, eventReporter, timeoutMillis);
servers[i] = server;
server.start();
final int loadBalancePort = server.getPort();
// Create the Load Balanced FlowFile Queue
final NodeIdentifier nodeId = new NodeIdentifier("unit-test-" + i, "localhost", 8090 + i, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
nodeIdentifiers.add(nodeId);
clusterEventListeners.forEach(listener -> listener.onNodeAdded(nodeId));
for (int j = 0; j < 2; j++) {
final Map<String, String> attributes = new HashMap<>();
attributes.put("greeting", "hello");
final MockFlowFileRecord flowFile = new MockFlowFileRecord(attributes, 0L);
flowFileQueue.put(flowFile);
}
}
final int totalFlowFileCount = 7;
// Wait up to 10 seconds for the server's FlowFile Repository to be updated
final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
while (serverRepoRecords.size() < totalFlowFileCount && System.currentTimeMillis() < endTime) {
Thread.sleep(10L);
}
assertFalse("Server's FlowFile Repo was never fully updated", serverRepoRecords.isEmpty());
assertEquals(totalFlowFileCount, serverRepoRecords.size());
for (final RepositoryRecord serverRecord : serverRepoRecords) {
final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
assertEquals("hello", serverFlowFile.getAttribute("greeting"));
}
while (clientRepoRecords.size() < totalFlowFileCount) {
Thread.sleep(10L);
}
assertEquals(totalFlowFileCount, clientRepoRecords.size());
for (final RepositoryRecord clientRecord : clientRepoRecords) {
assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
}
} finally {
clientTask.stop();
flowFileQueue.stopLoadBalancing();
clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
Arrays.stream(servers).filter(Objects::nonNull).forEach(ConnectionLoadBalanceServer::stop);
}
}
@Test(timeout = 90_000)
public void testFailover() throws IOException, InterruptedException {
localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
nodeIdentifiers.add(localNodeId);
// Create the server
final int timeoutMillis = 1000;
final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
final SSLContext sslContext = null;
final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
server.start();
try {
final int loadBalancePort = server.getPort();
// Create the Load Balanced FlowFile Queue
final NodeIdentifier availableNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
nodeIdentifiers.add(availableNodeId);
// Add a Node Identifier pointing to a non-existent server
final NodeIdentifier inaccessibleNodeId = new NodeIdentifier("unit-test-invalid-host-does-not-exist", "invalid-host-does-not-exist", 8090, "invalid-host-does-not-exist", 8090,
"invalid-host-does-not-exist", loadBalancePort, null, null, null, false, null);
nodeIdentifiers.add(inaccessibleNodeId);
final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
clientRegistry.start();
final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
final Thread clientThread = new Thread(clientTask);
clientThread.setDaemon(true);
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
try {
final int numFlowFiles = 1200;
for (int i = 0; i < numFlowFiles; i++) {
final ContentClaim contentClaim = createContentClaim("hello".getBytes());
final Map<String, String> attributes = new HashMap<>();
attributes.put("uuid", UUID.randomUUID().toString());
attributes.put("greeting", "hello");
final MockFlowFileRecord flowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
flowFileQueue.put(flowFile);
}
flowFileQueue.startLoadBalancing();
clientThread.start();
// Sending to one partition should fail. When that happens, half of the FlowFiles should go to the local partition,
// the other half to the other node. So the total number of FlowFiles expected is ((numFlowFiles per node) / 3 * 1.5)
final int flowFilesPerNode = numFlowFiles / 3;
final int expectedFlowFileReceiveCount = flowFilesPerNode + flowFilesPerNode / 2;
// Wait up to 10 seconds for the server's FlowFile Repository to be updated
final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60L);
while (serverRepoRecords.size() < expectedFlowFileReceiveCount && System.currentTimeMillis() < endTime) {
Thread.sleep(10L);
}
assertFalse("Server's FlowFile Repo was never fully updated", serverRepoRecords.isEmpty());
assertEquals(expectedFlowFileReceiveCount, serverRepoRecords.size());
for (final RepositoryRecord serverRecord : serverRepoRecords) {
final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
assertEquals("hello", serverFlowFile.getAttribute("greeting"));
final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
assertArrayEquals("hello".getBytes(), Arrays.copyOfRange(serverFlowFileContent, serverFlowFileContent.length - 5, serverFlowFileContent.length));
}
// We expect the client records to be numFlowFiles / 2 because half of the FlowFile will have gone to the other node
// in the cluster and half would still be in the local partition.
while (clientRepoRecords.size() < numFlowFiles / 2) {
Thread.sleep(10L);
}
assertEquals(numFlowFiles / 2, clientRepoRecords.size());
for (final RepositoryRecord clientRecord : clientRepoRecords) {
assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
}
} finally {
flowFileQueue.stopLoadBalancing();
clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
}
} finally {
server.stop();
}
}
@Test(timeout = 20_000)
public void testTransferToRemoteNode() throws IOException, InterruptedException {
localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
nodeIdentifiers.add(localNodeId);
// Create the server
final int timeoutMillis = 30000;
final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
final SSLContext sslContext = null;
final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
server.start();
try {
final int loadBalancePort = server.getPort();
// Create the Load Balanced FlowFile Queue
final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
nodeIdentifiers.add(remoteNodeId);
final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
clientRegistry.start();
final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
final Thread clientThread = new Thread(clientTask);
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
try {
final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
flowFileQueue.put(firstFlowFile);
final Map<String, String> attributes = new HashMap<>();
attributes.put("integration", "test");
attributes.put("unit-test", "false");
attributes.put("integration-test", "true");
final ContentClaim contentClaim = createContentClaim("hello".getBytes());
final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
flowFileQueue.put(secondFlowFile);
flowFileQueue.startLoadBalancing();
// Wait up to 10 seconds for the server's FlowFile Repository to be updated
final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
Thread.sleep(10L);
}
assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
assertEquals(1, serverRepoRecords.size());
final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
assertEquals("test", serverFlowFile.getAttribute("integration"));
assertEquals("false", serverFlowFile.getAttribute("unit-test"));
assertEquals("true", serverFlowFile.getAttribute("integration-test"));
final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
assertArrayEquals("hello".getBytes(), serverFlowFileContent);
while (clientRepoRecords.size() == 0) {
Thread.sleep(10L);
}
assertEquals(1, clientRepoRecords.size());
final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
} finally {
flowFileQueue.stopLoadBalancing();
clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
}
} finally {
server.stop();
}
}
@Test(timeout = 20_000)
public void testContentNotFound() throws IOException, InterruptedException {
localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
nodeIdentifiers.add(localNodeId);
// Create the server
final int timeoutMillis = 30000;
final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
final SSLContext sslContext = null;
final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
server.start();
try {
final int loadBalancePort = server.getPort();
// Create the Load Balanced FlowFile Queue
final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
nodeIdentifiers.add(remoteNodeId);
final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
clientRegistry.start();
final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
final Thread clientThread = new Thread(clientTask);
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
try {
final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
flowFileQueue.put(firstFlowFile);
final Map<String, String> attributes = new HashMap<>();
attributes.put("integration", "test");
attributes.put("unit-test", "false");
attributes.put("integration-test", "true");
final ContentClaim contentClaim = createContentClaim("hello".getBytes());
this.clientClaimContents.remove(contentClaim); // cause ContentNotFoundException
final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
flowFileQueue.put(secondFlowFile);
flowFileQueue.startLoadBalancing();
while (clientRepoRecords.size() == 0) {
Thread.sleep(10L);
}
assertEquals(1, clientRepoRecords.size());
final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
assertEquals(RepositoryRecordType.CONTENTMISSING, clientRecord.getType());
} finally {
flowFileQueue.stopLoadBalancing();
clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
}
} finally {
server.stop();
}
}
@Test(timeout = 20_000)
public void testTransferToRemoteNodeAttributeCompression() throws IOException, InterruptedException {
localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
nodeIdentifiers.add(localNodeId);
compressionReference.set(LoadBalanceCompression.COMPRESS_ATTRIBUTES_ONLY);
// Create the server
final int timeoutMillis = 30000;
final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
final SSLContext sslContext = null;
final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
server.start();
try {
final int loadBalancePort = server.getPort();
// Create the Load Balanced FlowFile Queue
final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
nodeIdentifiers.add(remoteNodeId);
final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
clientRegistry.start();
final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
final Thread clientThread = new Thread(clientTask);
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
flowFileQueue.setLoadBalanceCompression(LoadBalanceCompression.COMPRESS_ATTRIBUTES_ONLY);
try {
final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
flowFileQueue.put(firstFlowFile);
final Map<String, String> attributes = new HashMap<>();
attributes.put("integration", "test");
attributes.put("unit-test", "false");
attributes.put("integration-test", "true");
final ContentClaim contentClaim = createContentClaim("hello".getBytes());
final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
flowFileQueue.put(secondFlowFile);
flowFileQueue.startLoadBalancing();
// Wait up to 10 seconds for the server's FlowFile Repository to be updated
final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
Thread.sleep(10L);
}
assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
assertEquals(1, serverRepoRecords.size());
final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
assertEquals("test", serverFlowFile.getAttribute("integration"));
assertEquals("false", serverFlowFile.getAttribute("unit-test"));
assertEquals("true", serverFlowFile.getAttribute("integration-test"));
final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
assertArrayEquals("hello".getBytes(), serverFlowFileContent);
while (clientRepoRecords.size() == 0) {
Thread.sleep(10L);
}
assertEquals(1, clientRepoRecords.size());
final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
} finally {
flowFileQueue.stopLoadBalancing();
clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
}
} finally {
server.stop();
}
}
@Test(timeout = 20_000)
public void testTransferToRemoteNodeContentCompression() throws IOException, InterruptedException {
localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
nodeIdentifiers.add(localNodeId);
compressionReference.set(LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT);
// Create the server
final int timeoutMillis = 30000;
final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
final SSLContext sslContext = null;
final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
server.start();
try {
final int loadBalancePort = server.getPort();
// Create the Load Balanced FlowFile Queue
final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
nodeIdentifiers.add(remoteNodeId);
final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
clientRegistry.start();
final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
final Thread clientThread = new Thread(clientTask);
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
flowFileQueue.setLoadBalanceCompression(LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT);
try {
final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
flowFileQueue.put(firstFlowFile);
final Map<String, String> attributes = new HashMap<>();
attributes.put("integration", "test");
attributes.put("unit-test", "false");
attributes.put("integration-test", "true");
final ContentClaim contentClaim = createContentClaim("hello".getBytes());
final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
flowFileQueue.put(secondFlowFile);
flowFileQueue.startLoadBalancing();
// Wait up to 10 seconds for the server's FlowFile Repository to be updated
final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
Thread.sleep(10L);
}
assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
assertEquals(1, serverRepoRecords.size());
final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
assertEquals("test", serverFlowFile.getAttribute("integration"));
assertEquals("false", serverFlowFile.getAttribute("unit-test"));
assertEquals("true", serverFlowFile.getAttribute("integration-test"));
final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
assertArrayEquals("hello".getBytes(), serverFlowFileContent);
while (clientRepoRecords.size() == 0) {
Thread.sleep(10L);
}
assertEquals(1, clientRepoRecords.size());
final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
} finally {
flowFileQueue.stopLoadBalancing();
clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
}
} finally {
server.stop();
}
}
@Test(timeout = 20_000)
public void testWithSSLContext() throws IOException, InterruptedException, UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
nodeIdentifiers.add(localNodeId);
// Create the server
final int timeoutMillis = 30000;
final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
server.start();
try {
final int loadBalancePort = server.getPort();
// Create the Load Balanced FlowFile Queue
final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
nodeIdentifiers.add(remoteNodeId);
final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
clientRegistry.start();
final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
final Thread clientThread = new Thread(clientTask);
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
try {
final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
flowFileQueue.put(firstFlowFile);
final Map<String, String> attributes = new HashMap<>();
attributes.put("integration", "test");
attributes.put("unit-test", "false");
attributes.put("integration-test", "true");
final ContentClaim contentClaim = createContentClaim("hello".getBytes());
final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
flowFileQueue.put(secondFlowFile);
flowFileQueue.startLoadBalancing();
// Wait up to 10 seconds for the server's FlowFile Repository to be updated
final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
Thread.sleep(10L);
}
assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
assertEquals(1, serverRepoRecords.size());
final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
assertEquals("test", serverFlowFile.getAttribute("integration"));
assertEquals("false", serverFlowFile.getAttribute("unit-test"));
assertEquals("true", serverFlowFile.getAttribute("integration-test"));
final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
assertArrayEquals("hello".getBytes(), serverFlowFileContent);
while (clientRepoRecords.size() == 0) {
Thread.sleep(10L);
}
assertEquals(1, clientRepoRecords.size());
final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
} finally {
flowFileQueue.stopLoadBalancing();
clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
}
} finally {
server.stop();
}
}
@Test(timeout = 60_000)
public void testReusingClient() throws IOException, InterruptedException, UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
nodeIdentifiers.add(localNodeId);
// Create the server
final int timeoutMillis = 30000;
final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
server.start();
try {
final int loadBalancePort = server.getPort();
// Create the Load Balanced FlowFile Queue
final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
nodeIdentifiers.add(remoteNodeId);
final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
clientRegistry.start();
final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
final Thread clientThread = new Thread(clientTask);
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
try {
for (int i = 1; i <= 10; i++) {
final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
flowFileQueue.put(firstFlowFile);
final Map<String, String> attributes = new HashMap<>();
attributes.put("integration", "test");
attributes.put("unit-test", "false");
attributes.put("integration-test", "true");
final ContentClaim contentClaim = createContentClaim("hello".getBytes());
final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
flowFileQueue.put(secondFlowFile);
flowFileQueue.startLoadBalancing();
// Wait up to 10 seconds for the server's FlowFile Repository to be updated
final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
while (serverRepoRecords.size() < i && System.currentTimeMillis() < endTime) {
Thread.sleep(10L);
}
assertEquals(i, serverRepoRecords.size());
final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
assertEquals("test", serverFlowFile.getAttribute("integration"));
assertEquals("false", serverFlowFile.getAttribute("unit-test"));
assertEquals("true", serverFlowFile.getAttribute("integration-test"));
final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
assertArrayEquals("hello".getBytes(), serverFlowFileContent);
while (clientRepoRecords.size() < i) {
Thread.sleep(10L);
}
assertEquals(i, clientRepoRecords.size());
final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
}
} finally {
flowFileQueue.stopLoadBalancing();
clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
}
} finally {
server.stop();
}
}
@Test(timeout = 20_000)
public void testLargePayload() throws IOException, InterruptedException, UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
nodeIdentifiers.add(localNodeId);
// Create the server
final int timeoutMillis = 30000;
final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
server.start();
try {
final int loadBalancePort = server.getPort();
// Create the Load Balanced FlowFile Queue
final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
nodeIdentifiers.add(remoteNodeId);
final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
clientRegistry.start();
final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
final Thread clientThread = new Thread(clientTask);
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
final byte[] payload = new byte[1024 * 1024];
Arrays.fill(payload, (byte) 'A');
try {
final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
flowFileQueue.put(firstFlowFile);
final Map<String, String> attributes = new HashMap<>();
attributes.put("integration", "test");
attributes.put("unit-test", "false");
attributes.put("integration-test", "true");
final ContentClaim contentClaim = createContentClaim(payload);
final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, payload.length, contentClaim);
flowFileQueue.put(secondFlowFile);
flowFileQueue.startLoadBalancing();
// Wait up to 10 seconds for the server's FlowFile Repository to be updated
final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
Thread.sleep(10L);
}
assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
assertEquals(1, serverRepoRecords.size());
final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
assertEquals("test", serverFlowFile.getAttribute("integration"));
assertEquals("false", serverFlowFile.getAttribute("unit-test"));
assertEquals("true", serverFlowFile.getAttribute("integration-test"));
final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
assertArrayEquals(payload, serverFlowFileContent);
while (clientRepoRecords.size() == 0) {
Thread.sleep(10L);
}
assertEquals(1, clientRepoRecords.size());
final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
} finally {
flowFileQueue.stopLoadBalancing();
clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
}
} finally {
server.stop();
}
}
@Test(timeout = 60_000)
public void testServerClosesUnexpectedly() throws IOException, InterruptedException {
doAnswer(new Answer<OutputStream>() {
int iterations = 0;
@Override
public OutputStream answer(final InvocationOnMock invocation) {
if (iterations++ < 5) {
return new OutputStream() {
@Override
public void write(final int b) throws IOException {
throw new IOException("Intentional unit test failure");
}
};
}
final ContentClaim contentClaim = invocation.getArgument(0);
final ByteArrayOutputStream baos = new ByteArrayOutputStream() {
@Override
public void close() throws IOException {
super.close();
serverClaimContents.put(contentClaim, toByteArray());
}
};
return baos;
}
}).when(serverContentRepo).write(any(ContentClaim.class));
// Create the server
final int timeoutMillis = 30000;
final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
final SSLContext sslContext = null;
final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
server.start();
try {
final int loadBalancePort = server.getPort();
// Create the Load Balanced FlowFile Queue
final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
nodeIdentifiers.add(remoteNodeId);
final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
clientRegistry.start();
final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
final Thread clientThread = new Thread(clientTask);
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new FlowFilePartitioner() {
@Override
public QueuePartition getPartition(final FlowFileRecord flowFile, final QueuePartition[] partitions, final QueuePartition localPartition) {
for (final QueuePartition partition : partitions) {
if (partition != localPartition) {
return partition;
}
}
return null;
}
@Override
public boolean isRebalanceOnClusterResize() {
return true;
}
@Override
public boolean isRebalanceOnFailure() {
return true;
}
});
try {
final Map<String, String> attributes = new HashMap<>();
attributes.put("integration", "test");
attributes.put("unit-test", "false");
attributes.put("integration-test", "true");
final ContentClaim contentClaim = createContentClaim("hello".getBytes());
final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
flowFileQueue.put(secondFlowFile);
flowFileQueue.startLoadBalancing();
// Wait up to 10 seconds for the server's FlowFile Repository to be updated
final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
Thread.sleep(10L);
}
assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
assertEquals(1, serverRepoRecords.size());
final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
assertEquals("test", serverFlowFile.getAttribute("integration"));
assertEquals("false", serverFlowFile.getAttribute("unit-test"));
assertEquals("true", serverFlowFile.getAttribute("integration-test"));
final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
assertArrayEquals("hello".getBytes(), serverFlowFileContent);
while (clientRepoRecords.size() == 0) {
Thread.sleep(10L);
}
assertEquals(1, clientRepoRecords.size());
final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
} finally {
flowFileQueue.stopLoadBalancing();
clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
}
} finally {
server.stop();
}
}
@Test(timeout = 20_000)
public void testNotAuthorized() throws IOException, InterruptedException {
localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
nodeIdentifiers.add(localNodeId);
// Create the server
final int timeoutMillis = 30000;
final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, NEVER_AUTHORIZED);
final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
server.start();
try {
final int loadBalancePort = server.getPort();
// Create the Load Balanced FlowFile Queue
final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
nodeIdentifiers.add(remoteNodeId);
final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
clientRegistry.start();
final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
final Thread clientThread = new Thread(clientTask);
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
try {
final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
flowFileQueue.put(firstFlowFile);
final Map<String, String> attributes = new HashMap<>();
attributes.put("integration", "test");
attributes.put("unit-test", "false");
attributes.put("integration-test", "true");
final ContentClaim contentClaim = createContentClaim("hello".getBytes());
final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
flowFileQueue.put(secondFlowFile);
flowFileQueue.startLoadBalancing();
Thread.sleep(5000L);
assertTrue("Server's FlowFile Repo was updated", serverRepoRecords.isEmpty());
assertTrue(clientRepoRecords.isEmpty());
assertEquals(2, flowFileQueue.size().getObjectCount());
} finally {
flowFileQueue.stopLoadBalancing();
clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
}
} finally {
server.stop();
}
}
@Test(timeout = 35_000)
public void testDestinationNodeQueueFull() throws IOException, InterruptedException {
localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
nodeIdentifiers.add(localNodeId);
when(serverQueue.isLocalPartitionFull()).thenReturn(true);
// Create the server
final int timeoutMillis = 30000;
final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
server.start();
try {
final int loadBalancePort = server.getPort();
// Create the Load Balanced FlowFile Queue
final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
nodeIdentifiers.add(remoteNodeId);
final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
clientRegistry.start();
final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
final Thread clientThread = new Thread(clientTask);
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
try {
final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
flowFileQueue.put(firstFlowFile);
final Map<String, String> attributes = new HashMap<>();
attributes.put("integration", "test");
attributes.put("unit-test", "false");
attributes.put("integration-test", "true");
final ContentClaim contentClaim = createContentClaim("hello".getBytes());
final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
flowFileQueue.put(secondFlowFile);
flowFileQueue.startLoadBalancing();
Thread.sleep(5000L);
assertTrue("Server's FlowFile Repo was updated", serverRepoRecords.isEmpty());
assertTrue(clientRepoRecords.isEmpty());
assertEquals(2, flowFileQueue.size().getObjectCount());
// Enable data to be transferred
when(serverQueue.isLocalPartitionFull()).thenReturn(false);
while (clientRepoRecords.size() != 1) {
Thread.sleep(10L);
}
assertEquals(1, serverRepoRecords.size());
} finally {
flowFileQueue.stopLoadBalancing();
clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
}
} finally {
server.stop();
}
}
private FlowFileRepository createFlowFileRepository(final List<RepositoryRecord> repoRecords) throws IOException {
final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class);
doAnswer(invocation -> {
final Collection records = invocation.getArgument(0);
repoRecords.addAll(records);
return null;
}).when(flowFileRepo).updateRepository(anyCollection());
return flowFileRepo;
}
private ContentRepository createContentRepository(final ConcurrentMap<ContentClaim, byte[]> claimContents) throws IOException {
final ContentRepository contentRepo = mock(ContentRepository.class);
Mockito.doAnswer(new Answer<ContentClaim>() {
@Override
public ContentClaim answer(final InvocationOnMock invocation) {
return createContentClaim(null);
}
}).when(contentRepo).create(Mockito.anyBoolean());
Mockito.doAnswer(new Answer<OutputStream>() {
@Override
public OutputStream answer(final InvocationOnMock invocation) {
final ContentClaim contentClaim = invocation.getArgument(0);
final ByteArrayOutputStream baos = new ByteArrayOutputStream() {
@Override
public void close() throws IOException {
super.close();
claimContents.put(contentClaim, toByteArray());
}
};
return baos;
}
}).when(contentRepo).write(any(ContentClaim.class));
Mockito.doAnswer(new Answer<InputStream>() {
@Override
public InputStream answer(final InvocationOnMock invocation) {
final ContentClaim contentClaim = invocation.getArgument(0);
if (contentClaim == null) {
return new ByteArrayInputStream(new byte[0]);
}
final byte[] bytes = claimContents.get(contentClaim);
if (bytes == null) {
throw new ContentNotFoundException(contentClaim);
}
return new ByteArrayInputStream(bytes);
}
}).when(contentRepo).read(Mockito.nullable(ContentClaim.class));
return contentRepo;
}
}