blob: ba6636f3b61fae709c770a870c7253babb12300f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.coordination.http.replication;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserDetails;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.authorization.user.StandardNiFiUser;
import org.apache.nifi.authorization.user.StandardNiFiUser.Builder;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.replication.util.MockReplicationClient;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.api.entity.Entity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
import org.apache.nifi.web.security.token.NiFiAuthenticationToken;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestThreadPoolRequestReplicator {
@BeforeClass
public static void setupClass() {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties");
}
@Test
public void testFailedRequestsAreCleanedUp() {
withReplicator(replicator -> {
final Set<NodeIdentifier> nodeIds = new HashSet<>();
nodeIds.add(new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false));
final URI uri = new URI("http://localhost:8080/processors/1");
final Entity entity = new ProcessorEntity();
// set the user
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(StandardNiFiUser.ANONYMOUS));
SecurityContextHolder.getContext().setAuthentication(authentication);
final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true);
// We should get back the same response object
assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
assertEquals(HttpMethod.GET, response.getMethod());
assertEquals(nodeIds, response.getNodesInvolved());
assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
final NodeResponse nodeResponse = response.awaitMergedResponse(3, TimeUnit.SECONDS);
assertEquals(8000, nodeResponse.getNodeId().getApiPort());
assertEquals(Response.Status.FORBIDDEN.getStatusCode(), nodeResponse.getStatus());
assertNull(replicator.getClusterResponse(response.getRequestIdentifier()));
}, Status.FORBIDDEN, 0L, null);
}
/**
* If we replicate a request, whenever we obtain the merged response from
* the AsyncClusterResponse object, the response should no longer be
* available and should be cleared from internal state. This test is to
* verify that this behavior occurs.
*/
@Test
public void testResponseRemovedWhenCompletedAndFetched() {
withReplicator(replicator -> {
final Set<NodeIdentifier> nodeIds = new HashSet<>();
nodeIds.add(new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false));
final URI uri = new URI("http://localhost:8080/processors/1");
final Entity entity = new ProcessorEntity();
// set the user
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(StandardNiFiUser.ANONYMOUS));
SecurityContextHolder.getContext().setAuthentication(authentication);
final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true);
// We should get back the same response object
assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
assertEquals(HttpMethod.GET, response.getMethod());
assertEquals(nodeIds, response.getNodesInvolved());
assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
final NodeResponse nodeResponse = response.awaitMergedResponse(3, TimeUnit.SECONDS);
assertEquals(8000, nodeResponse.getNodeId().getApiPort());
assertEquals(Response.Status.OK.getStatusCode(), nodeResponse.getStatus());
assertNull(replicator.getClusterResponse(response.getRequestIdentifier()));
});
}
@Test
public void testRequestChain() {
final String proxyIdentity2 = "proxy-2";
final String proxyIdentity1 = "proxy-1";
final String userIdentity = "user";
withReplicator(replicator -> {
final Set<NodeIdentifier> nodeIds = new HashSet<>();
nodeIds.add(new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false));
final URI uri = new URI("http://localhost:8080/processors/1");
final Entity entity = new ProcessorEntity();
// set the user
final NiFiUser proxy2 = new Builder().identity(proxyIdentity2).build();
final NiFiUser proxy1 = new Builder().identity(proxyIdentity1).chain(proxy2).build();
final NiFiUser user = new Builder().identity(userIdentity).chain(proxy1).build();
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(user));
SecurityContextHolder.getContext().setAuthentication(authentication);
replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true);
}, Response.Status.OK, 0L, null, "<" + userIdentity + "><" + proxyIdentity1 + "><" + proxyIdentity2 +">");
}
@Test
public void testRequestChainWithIdentityProviderGroups() {
final String idpGroup1 = "idp-group-1";
final String idpGroup2 = "idp-group-2";
final Set<String> idpGroups = new LinkedHashSet<>(Arrays.asList(idpGroup1, idpGroup2));
final String expectedProxiedEntityGroups = "<" + idpGroup1 + "><" + idpGroup2 + ">";
final String proxyIdentity2 = "proxy-2";
final String proxyIdentity1 = "proxy-1";
final String userIdentity = "user";
final String expectedRequestChain = "<" + userIdentity + "><" + proxyIdentity1 + "><" + proxyIdentity2 +">";
withReplicator(replicator -> {
final Set<NodeIdentifier> nodeIds = new HashSet<>();
nodeIds.add(new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false));
final URI uri = new URI("http://localhost:8080/processors/1");
final Entity entity = new ProcessorEntity();
// set the user
final NiFiUser proxy2 = new Builder().identity(proxyIdentity2).build();
final NiFiUser proxy1 = new Builder().identity(proxyIdentity1).chain(proxy2).build();
final NiFiUser user = new Builder().identity(userIdentity).identityProviderGroups(idpGroups).chain(proxy1).build();
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(user));
SecurityContextHolder.getContext().setAuthentication(authentication);
replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true);
}, Response.Status.OK, 0L, null, expectedRequestChain, expectedProxiedEntityGroups);
}
@Test(timeout = 15000)
public void testLongWaitForResponse() {
withReplicator(replicator -> {
final Set<NodeIdentifier> nodeIds = new HashSet<>();
final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false);
nodeIds.add(nodeId);
final URI uri = new URI("http://localhost:8080/processors/1");
final Entity entity = new ProcessorEntity();
// set the user
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(StandardNiFiUser.ANONYMOUS));
SecurityContextHolder.getContext().setAuthentication(authentication);
final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true);
// We should get back the same response object
assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
final NodeResponse completedNodeResponse = response.awaitMergedResponse(2, TimeUnit.SECONDS);
assertNotNull(completedNodeResponse);
assertNotNull(completedNodeResponse.getThrowable());
assertEquals(500, completedNodeResponse.getStatus());
assertTrue(response.isComplete());
assertNotNull(response.getMergedResponse());
assertNull(replicator.getClusterResponse(response.getRequestIdentifier()));
}, Status.OK, 1000, new ProcessingException(new SocketTimeoutException()));
}
@Test(timeout = 15000)
public void testCompleteOnError() {
withReplicator(replicator -> {
final Set<NodeIdentifier> nodeIds = new HashSet<>();
final NodeIdentifier id1 = new NodeIdentifier("1", "localhost", 8100, "localhost", 8101, "localhost", 8102, 8103, false);
final NodeIdentifier id2 = new NodeIdentifier("2", "localhost", 8200, "localhost", 8201, "localhost", 8202, 8203, false);
final NodeIdentifier id3 = new NodeIdentifier("3", "localhost", 8300, "localhost", 8301, "localhost", 8302, 8303, false);
final NodeIdentifier id4 = new NodeIdentifier("4", "localhost", 8400, "localhost", 8401, "localhost", 8402, 8403, false);
nodeIds.add(id1);
nodeIds.add(id2);
nodeIds.add(id3);
nodeIds.add(id4);
final URI uri = new URI("http://localhost:8080/processors/1");
final Entity entity = new ProcessorEntity();
// set the user
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(StandardNiFiUser.ANONYMOUS));
SecurityContextHolder.getContext().setAuthentication(authentication);
final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true);
assertNotNull(response.awaitMergedResponse(1, TimeUnit.SECONDS));
}, null, 0L, new IllegalArgumentException("Exception created for unit test"));
}
@Test(timeout = 15000)
public void testMultipleRequestWithTwoPhaseCommit() {
final Set<NodeIdentifier> nodeIds = new HashSet<>();
final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 8100, "localhost", 8101, "localhost", 8102, 8103, false);
nodeIds.add(nodeId);
final ClusterCoordinator coordinator = mock(ClusterCoordinator.class);
when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
final AtomicInteger requestCount = new AtomicInteger(0);
final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null);
final MockReplicationClient client = new MockReplicationClient();
final RequestCompletionCallback requestCompletionCallback = (uri, method, responses) -> {
};
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(5, 100, client, coordinator, requestCompletionCallback, EventReporter.NO_OP, props) {
@Override
protected NodeResponse replicateRequest(final PreparedRequest request, final NodeIdentifier nodeId,
final URI uri, final String requestId, final StandardAsyncClusterResponse response) {
// the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them.
final Object expectsHeader = request.getHeaders().get(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
final int statusCode;
if (requestCount.incrementAndGet() == 1) {
assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE, expectsHeader);
statusCode = Status.ACCEPTED.getStatusCode();
} else {
assertNull(expectsHeader);
statusCode = Status.OK.getStatusCode();
}
// Return given response from all nodes.
final Response clientResponse = mock(Response.class);
when(clientResponse.getStatus()).thenReturn(statusCode);
return new NodeResponse(nodeId, request.getMethod(), uri, clientResponse, -1L, requestId);
}
};
try {
// set the user
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(StandardNiFiUser.ANONYMOUS));
SecurityContextHolder.getContext().setAuthentication(authentication);
final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST,
new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true, true);
clusterResponse.awaitMergedResponse();
// Ensure that we received two requests - the first should contain the X-NcmExpects header; the second should not.
// These assertions are validated above, in the overridden replicateRequest method.
assertEquals(2, requestCount.get());
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
} finally {
replicator.shutdown();
}
}
private ClusterCoordinator createClusterCoordinator() {
final ClusterCoordinator coordinator = mock(ClusterCoordinator.class);
when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenAnswer(new Answer<NodeConnectionStatus>() {
@Override
public NodeConnectionStatus answer(InvocationOnMock invocation) throws Throwable {
return new NodeConnectionStatus(invocation.getArgument(0), NodeConnectionState.CONNECTED);
}
});
return coordinator;
}
@Test
public void testMutableRequestRequiresAllNodesConnected() throws URISyntaxException {
final ClusterCoordinator coordinator = createClusterCoordinator();
// build a map of connection state to node ids
final Map<NodeConnectionState, List<NodeIdentifier>> nodeMap = new HashMap<>();
final List<NodeIdentifier> connectedNodes = new ArrayList<>();
connectedNodes.add(new NodeIdentifier("1", "localhost", 8100, "localhost", 8101, "localhost", 8102, 8103, false));
connectedNodes.add(new NodeIdentifier("2", "localhost", 8200, "localhost", 8201, "localhost", 8202, 8203, false));
nodeMap.put(NodeConnectionState.CONNECTED, connectedNodes);
final List<NodeIdentifier> otherState = new ArrayList<>();
otherState.add(new NodeIdentifier("3", "localhost", 8300, "localhost", 8301, "localhost", 8302, 8303, false));
nodeMap.put(NodeConnectionState.CONNECTING, otherState);
when(coordinator.getConnectionStates()).thenReturn(nodeMap);
final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null);
final MockReplicationClient client = new MockReplicationClient();
final RequestCompletionCallback requestCompletionCallback = (uri, method, responses) -> {
};
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(5, 100, client, coordinator, requestCompletionCallback, EventReporter.NO_OP, props) {
@Override
public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers,
boolean indicateReplicated, boolean verify) {
return null;
}
};
try {
// set the user
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(StandardNiFiUser.ANONYMOUS));
SecurityContextHolder.getContext().setAuthentication(authentication);
try {
replicator.replicate(HttpMethod.POST, new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>());
Assert.fail("Expected ConnectingNodeMutableRequestException");
} catch (final ConnectingNodeMutableRequestException e) {
// expected behavior
}
nodeMap.remove(NodeConnectionState.CONNECTING);
nodeMap.put(NodeConnectionState.DISCONNECTED, otherState);
try {
replicator.replicate(HttpMethod.POST, new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>());
Assert.fail("Expected DisconnectedNodeMutableRequestException");
} catch (final DisconnectedNodeMutableRequestException e) {
// expected behavior
}
nodeMap.remove(NodeConnectionState.DISCONNECTED);
nodeMap.put(NodeConnectionState.DISCONNECTING, otherState);
try {
replicator.replicate(HttpMethod.POST, new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>());
Assert.fail("Expected DisconnectedNodeMutableRequestException");
} catch (final DisconnectedNodeMutableRequestException e) {
// expected behavior
}
// should not throw an Exception because it's a GET
replicator.replicate(HttpMethod.GET, new URI("http://localhost:80/processors/1"), new MultivaluedHashMap<>(), new HashMap<>());
// should not throw an Exception because all nodes are now connected
nodeMap.remove(NodeConnectionState.DISCONNECTING);
replicator.replicate(HttpMethod.POST, new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>());
} finally {
replicator.shutdown();
}
}
@Test(timeout = 15000)
public void testOneNodeRejectsTwoPhaseCommit() {
final Set<NodeIdentifier> nodeIds = new HashSet<>();
nodeIds.add(new NodeIdentifier("1", "localhost", 8100, "localhost", 8101, "localhost", 8102, 8103, false));
nodeIds.add(new NodeIdentifier("2", "localhost", 8200, "localhost", 8201, "localhost", 8202, 8203, false));
final ClusterCoordinator coordinator = createClusterCoordinator();
final AtomicInteger requestCount = new AtomicInteger(0);
final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null);
final MockReplicationClient client = new MockReplicationClient();
final RequestCompletionCallback requestCompletionCallback = (uri, method, responses) -> {
};
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(5, 100, client, coordinator, requestCompletionCallback, EventReporter.NO_OP, props) {
@Override
protected NodeResponse replicateRequest(final PreparedRequest request, final NodeIdentifier nodeId,
final URI uri, final String requestId, final StandardAsyncClusterResponse response) {
// the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them.
final Object expectsHeader = request.getHeaders().get(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
final int requestIndex = requestCount.incrementAndGet();
assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE, expectsHeader);
if (requestIndex == 1) {
final Response clientResponse = mock(Response.class);
when(clientResponse.getStatus()).thenReturn(202);
return new NodeResponse(nodeId, request.getMethod(), uri, clientResponse, -1L, requestId);
} else {
final IllegalClusterStateException explanation = new IllegalClusterStateException("Intentional Exception for Unit Testing");
return new NodeResponse(nodeId, request.getMethod(), uri, explanation);
}
}
};
try {
// set the user
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(StandardNiFiUser.ANONYMOUS));
SecurityContextHolder.getContext().setAuthentication(authentication);
final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST,
new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true, true);
clusterResponse.awaitMergedResponse();
Assert.fail("Expected to get an IllegalClusterStateException but did not");
} catch (final IllegalClusterStateException e) {
// Expected
} catch (final Exception e) {
Assert.fail(e.toString());
} finally {
replicator.shutdown();
}
}
@Test(timeout = 5000)
public void testMonitorNotifiedOnException() {
withReplicator(replicator -> {
final Object monitor = new Object();
final CountDownLatch preNotifyLatch = new CountDownLatch(1);
final CountDownLatch postNotifyLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
synchronized (monitor) {
while (true) {
// If monitor is not notified, this will block indefinitely, and the test will timeout
try {
preNotifyLatch.countDown();
monitor.wait();
break;
} catch (InterruptedException e) {
continue;
}
}
postNotifyLatch.countDown();
}
}
}).start();
// wait for the background thread to notify that it is synchronized on monitor.
preNotifyLatch.await();
try {
// set the user
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(StandardNiFiUser.ANONYMOUS));
SecurityContextHolder.getContext().setAuthentication(authentication);
// ensure the proxied entities header is set
final Map<String, String> updatedHeaders = new HashMap<>();
replicator.updateRequestHeaders(updatedHeaders, NiFiUserUtils.getNiFiUser());
// Pass in Collections.emptySet() for the node ID's so that an Exception is thrown
replicator.replicate(Collections.emptySet(), "GET", new URI("localhost:8080/nifi"), Collections.emptyMap(),
updatedHeaders, true, null, true, true, monitor);
Assert.fail("replicate did not throw IllegalArgumentException");
} catch (final IllegalArgumentException iae) {
// expected
}
// wait for monitor to be notified.
postNotifyLatch.await();
});
}
@Test(timeout = 5000)
public void testMonitorNotifiedOnSuccessfulCompletion() {
withReplicator(replicator -> {
final Object monitor = new Object();
final CountDownLatch preNotifyLatch = new CountDownLatch(1);
final CountDownLatch postNotifyLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
synchronized (monitor) {
while (true) {
// If monitor is not notified, this will block indefinitely, and the test will timeout
try {
preNotifyLatch.countDown();
monitor.wait();
break;
} catch (InterruptedException e) {
continue;
}
}
postNotifyLatch.countDown();
}
}
}).start();
// wait for the background thread to notify that it is synchronized on monitor.
preNotifyLatch.await();
final Set<NodeIdentifier> nodeIds = new HashSet<>();
final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false);
nodeIds.add(nodeId);
final URI uri = new URI("http://localhost:8080/processors/1");
final Entity entity = new ProcessorEntity();
// set the user
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(StandardNiFiUser.ANONYMOUS));
SecurityContextHolder.getContext().setAuthentication(authentication);
// ensure the proxied entities header is set
final Map<String, String> updatedHeaders = new HashMap<>();
replicator.updateRequestHeaders(updatedHeaders, NiFiUserUtils.getNiFiUser());
replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, updatedHeaders, true, null, true, true, monitor);
// wait for monitor to be notified.
postNotifyLatch.await();
});
}
@Test(timeout = 5000)
public void testMonitorNotifiedOnFailureResponse() {
withReplicator(replicator -> {
final Object monitor = new Object();
final CountDownLatch preNotifyLatch = new CountDownLatch(1);
final CountDownLatch postNotifyLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
synchronized (monitor) {
while (true) {
// If monitor is not notified, this will block indefinitely, and the test will timeout
try {
preNotifyLatch.countDown();
monitor.wait();
break;
} catch (InterruptedException e) {
continue;
}
}
postNotifyLatch.countDown();
}
}
}).start();
// wait for the background thread to notify that it is synchronized on monitor.
preNotifyLatch.await();
final Set<NodeIdentifier> nodeIds = new HashSet<>();
final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false);
nodeIds.add(nodeId);
final URI uri = new URI("http://localhost:8080/processors/1");
final Entity entity = new ProcessorEntity();
// set the user
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(StandardNiFiUser.ANONYMOUS));
SecurityContextHolder.getContext().setAuthentication(authentication);
// ensure the proxied entities header is set
final Map<String, String> updatedHeaders = new HashMap<>();
replicator.updateRequestHeaders(updatedHeaders, NiFiUserUtils.getNiFiUser());
replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, updatedHeaders, true, null, true, true, monitor);
// wait for monitor to be notified.
postNotifyLatch.await();
}, Status.INTERNAL_SERVER_ERROR, 0L, null);
}
private void withReplicator(final WithReplicator function) {
withReplicator(function, Response.Status.OK, 0L, null);
}
private void withReplicator(final WithReplicator function, final Status status, final long delayMillis, final RuntimeException failure) {
withReplicator(function, status, delayMillis, failure, "<>", "<>");
}
private void withReplicator(final WithReplicator function, final Status status, final long delayMillis, final RuntimeException failure,
final String expectedRequestChain) {
withReplicator(function, status, delayMillis, failure, expectedRequestChain, "<>");
}
private void withReplicator(final WithReplicator function, final Status status, final long delayMillis, final RuntimeException failure,
final String expectedRequestChain, final String expectedProxiedEntityGroups) {
final ClusterCoordinator coordinator = createClusterCoordinator();
final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null);
final MockReplicationClient client = new MockReplicationClient();
final RequestCompletionCallback requestCompletionCallback = (uri, method, responses) -> {
};
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(5, 100, client, coordinator, requestCompletionCallback, EventReporter.NO_OP, nifiProps) {
@Override
protected NodeResponse replicateRequest(final PreparedRequest request, final NodeIdentifier nodeId, final URI uri, final String requestId,
final StandardAsyncClusterResponse response) {
if (delayMillis > 0L) {
try {
Thread.sleep(delayMillis);
} catch (InterruptedException e) {
Assert.fail("Thread Interrupted during test");
}
}
if (failure != null) {
throw failure;
}
// ensure the request chain is in the request
final Object proxiedEntities = request.getHeaders().get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN);
Assert.assertEquals(expectedRequestChain, proxiedEntities);
// ensure the proxied entity groups are in the request
final Object proxiedEntityGroups = request.getHeaders().get(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS);
Assert.assertEquals(expectedProxiedEntityGroups, proxiedEntityGroups);
// Return given response from all nodes.
final Response clientResponse = mock(Response.class);
when(clientResponse.getStatus()).thenReturn(status.getStatusCode());
return new NodeResponse(nodeId, request.getMethod(), uri, clientResponse, -1L, requestId);
}
};
try {
function.withReplicator(replicator);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
} finally {
replicator.shutdown();
}
}
private interface WithReplicator {
void withReplicator(ThreadPoolRequestReplicator replicator) throws Exception;
}
}