blob: 511620c4610bc8c979e6fc5b8d5aea5204ad7ad6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.coordination.node;
import org.apache.nifi.cluster.coordination.flow.FlowElection;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.state.MockStateMap;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.revision.RevisionManager;
import org.apache.nifi.web.revision.RevisionSnapshot;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
public class TestNodeClusterCoordinator {
private NodeClusterCoordinator coordinator;
private ClusterCoordinationProtocolSenderListener senderListener;
private List<NodeConnectionStatus> nodeStatuses;
private StateManagerProvider stateManagerProvider;
private final RevisionSnapshot emptyRevisionSnapshot = new RevisionSnapshot(Collections.emptyList(), 0L);
private NiFiProperties createProperties() {
final Map<String,String> addProps = new HashMap<>();
addProps.put("nifi.zookeeper.connect.string", "localhost:2181");
return NiFiProperties.createBasicNiFiProperties(null, addProps);
}
@Before
public void setup() throws IOException {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties");
senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class);
nodeStatuses = Collections.synchronizedList(new ArrayList<>());
stateManagerProvider = Mockito.mock(StateManagerProvider.class);
final StateManager stateManager = Mockito.mock(StateManager.class);
when(stateManager.getState(any(Scope.class))).thenReturn(new MockStateMap(Collections.emptyMap(), 1));
when(stateManagerProvider.getStateManager(anyString())).thenReturn(stateManager);
final EventReporter eventReporter = Mockito.mock(EventReporter.class);
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
when(revisionManager.getAllRevisions()).thenReturn(emptyRevisionSnapshot);
coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties(), null, stateManagerProvider) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
nodeStatuses.add(updatedStatus);
}
};
final FlowService flowService = Mockito.mock(FlowService.class);
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50], new HashSet<>());
when(flowService.createDataFlow()).thenReturn(dataFlow);
coordinator.setFlowService(flowService);
}
@Test
public void testConnectionResponseIndicatesAllNodes() throws IOException {
// Add a disconnected node
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), DisconnectionCode.LACK_OF_HEARTBEAT));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED));
// Create a connection request message and send to the coordinator
final NodeIdentifier requestedNodeId = createNodeId(6);
final ProtocolMessage protocolResponse = requestConnection(requestedNodeId, coordinator);
assertNotNull(protocolResponse);
assertTrue(protocolResponse instanceof ConnectionResponseMessage);
final ConnectionResponse response = ((ConnectionResponseMessage) protocolResponse).getConnectionResponse();
assertNotNull(response);
assertEquals(requestedNodeId, response.getNodeIdentifier());
assertNull(response.getRejectionReason());
final List<NodeConnectionStatus> statuses = response.getNodeConnectionStatuses();
assertNotNull(statuses);
assertEquals(6, statuses.size());
final Map<NodeIdentifier, NodeConnectionStatus> statusMap = statuses.stream().collect(
Collectors.toMap(status -> status.getNodeIdentifier(), status -> status));
assertEquals(DisconnectionCode.LACK_OF_HEARTBEAT, statusMap.get(createNodeId(1)).getDisconnectCode());
assertEquals(NodeConnectionState.DISCONNECTING, statusMap.get(createNodeId(2)).getState());
assertEquals(NodeConnectionState.CONNECTING, statusMap.get(createNodeId(3)).getState());
assertEquals(NodeConnectionState.CONNECTED, statusMap.get(createNodeId(4)).getState());
assertEquals(NodeConnectionState.CONNECTED, statusMap.get(createNodeId(5)).getState());
assertEquals(NodeConnectionState.CONNECTING, statusMap.get(createNodeId(6)).getState());
}
@Test
public void testTryAgainIfNoFlowServiceSet() throws IOException {
final ClusterCoordinationProtocolSenderListener senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class);
final EventReporter eventReporter = Mockito.mock(EventReporter.class);
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
when(revisionManager.getAllRevisions()).thenReturn(emptyRevisionSnapshot);
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(),
null, revisionManager, createProperties(), null, stateManagerProvider) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
}
};
final NodeIdentifier requestedNodeId = createNodeId(6);
final ConnectionRequest request = new ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>()));
final ConnectionRequestMessage requestMsg = new ConnectionRequestMessage();
requestMsg.setConnectionRequest(request);
coordinator.setConnected(true);
final ProtocolMessage protocolResponse = coordinator.handle(requestMsg, Collections.emptySet());
assertNotNull(protocolResponse);
assertTrue(protocolResponse instanceof ConnectionResponseMessage);
final ConnectionResponse response = ((ConnectionResponseMessage) protocolResponse).getConnectionResponse();
assertNotNull(response);
assertEquals(5, response.getTryLaterSeconds());
}
@Test(timeout = 5000)
public void testUnknownNodeAskedToConnectOnAttemptedConnectionComplete() throws IOException, InterruptedException {
final ClusterCoordinationProtocolSenderListener senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class);
final AtomicReference<ReconnectionRequestMessage> requestRef = new AtomicReference<>();
when(senderListener.requestReconnection(any(ReconnectionRequestMessage.class))).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final ReconnectionRequestMessage msg = invocation.getArgument(0);
requestRef.set(msg);
return null;
}
});
final EventReporter eventReporter = Mockito.mock(EventReporter.class);
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
when(revisionManager.getAllRevisions()).thenReturn(emptyRevisionSnapshot);
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(),
null, revisionManager, createProperties(), null, stateManagerProvider) {
@Override
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
}
};
final FlowService flowService = Mockito.mock(FlowService.class);
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50], new HashSet<>());
when(flowService.createDataFlowFromController()).thenReturn(dataFlow);
coordinator.setFlowService(flowService);
coordinator.setConnected(true);
final NodeIdentifier nodeId = createNodeId(1);
coordinator.finishNodeConnection(nodeId);
while (requestRef.get() == null) {
Thread.sleep(10L);
}
final ReconnectionRequestMessage msg = requestRef.get();
assertEquals(nodeId, msg.getNodeId());
final StandardDataFlow df = msg.getDataFlow();
assertNotNull(df);
assertTrue(Arrays.equals(dataFlow.getFlow(), df.getFlow()));
assertTrue(Arrays.equals(dataFlow.getSnippets(), df.getSnippets()));
}
@Test(timeout = 5000)
public void testFinishNodeConnectionResultsInConnectedState() throws IOException, InterruptedException {
final NodeIdentifier nodeId = createNodeId(1);
// Create a connection request message and send to the coordinator
requestConnection(createNodeId(1), coordinator);
while (nodeStatuses.isEmpty()) {
Thread.sleep(20L);
}
assertEquals(NodeConnectionState.CONNECTING, nodeStatuses.get(0).getState());
nodeStatuses.clear();
// Finish connecting. This should notify all that the status is now 'CONNECTED'
coordinator.finishNodeConnection(nodeId);
while (nodeStatuses.isEmpty()) {
Thread.sleep(20L);
}
assertEquals(NodeConnectionState.CONNECTED, nodeStatuses.get(0).getState());
assertEquals(NodeConnectionState.CONNECTED, coordinator.getConnectionStatus(nodeId).getState());
}
@Test(timeout = 5000)
public void testStatusChangesReplicated() throws InterruptedException, IOException {
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
when(revisionManager.getAllRevisions()).thenReturn(emptyRevisionSnapshot);
// Create a connection request message and send to the coordinator
final NodeIdentifier requestedNodeId = createNodeId(1);
requestConnection(requestedNodeId, coordinator);
// The above connection request should trigger a 'CONNECTING' state transition to be replicated
while (nodeStatuses.isEmpty()) {
Thread.sleep(20L);
}
final NodeConnectionStatus connectingStatus = nodeStatuses.get(0);
assertEquals(NodeConnectionState.CONNECTING, connectingStatus.getState());
assertEquals(requestedNodeId, connectingStatus.getNodeIdentifier());
// set node status to connected
coordinator.finishNodeConnection(requestedNodeId);
// the above method will result in the node identifier becoming 'CONNECTED'. Wait for this to happen and clear the map
while (nodeStatuses.isEmpty()) {
Thread.sleep(20L);
}
nodeStatuses.clear();
coordinator.disconnectionRequestedByNode(requestedNodeId, DisconnectionCode.NODE_SHUTDOWN, "Unit Test");
while (nodeStatuses.isEmpty()) {
Thread.sleep(20L);
}
assertEquals(1, nodeStatuses.size());
final NodeConnectionStatus statusChange = nodeStatuses.get(0);
assertNotNull(statusChange);
assertEquals(createNodeId(1), statusChange.getNodeIdentifier());
assertEquals(DisconnectionCode.NODE_SHUTDOWN, statusChange.getDisconnectCode());
assertEquals("Unit Test", statusChange.getReason());
}
@Test
public void testGetConnectionStates() throws IOException {
// Add a disconnected node
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), DisconnectionCode.LACK_OF_HEARTBEAT));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED));
final Map<NodeConnectionState, List<NodeIdentifier>> stateMap = coordinator.getConnectionStates();
assertEquals(4, stateMap.size());
final List<NodeIdentifier> connectedIds = stateMap.get(NodeConnectionState.CONNECTED);
assertEquals(2, connectedIds.size());
assertTrue(connectedIds.contains(createNodeId(4)));
assertTrue(connectedIds.contains(createNodeId(5)));
final List<NodeIdentifier> connectingIds = stateMap.get(NodeConnectionState.CONNECTING);
assertEquals(1, connectingIds.size());
assertTrue(connectingIds.contains(createNodeId(3)));
final List<NodeIdentifier> disconnectingIds = stateMap.get(NodeConnectionState.DISCONNECTING);
assertEquals(1, disconnectingIds.size());
assertTrue(disconnectingIds.contains(createNodeId(2)));
final List<NodeIdentifier> disconnectedIds = stateMap.get(NodeConnectionState.DISCONNECTED);
assertEquals(1, disconnectedIds.size());
assertTrue(disconnectedIds.contains(createNodeId(1)));
}
@Test
public void testGetNodeIdentifiers() throws IOException {
// Add a disconnected node
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), DisconnectionCode.LACK_OF_HEARTBEAT));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED));
final Set<NodeIdentifier> connectedIds = coordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED);
assertEquals(2, connectedIds.size());
assertTrue(connectedIds.contains(createNodeId(4)));
assertTrue(connectedIds.contains(createNodeId(5)));
final Set<NodeIdentifier> connectingIds = coordinator.getNodeIdentifiers(NodeConnectionState.CONNECTING);
assertEquals(1, connectingIds.size());
assertTrue(connectingIds.contains(createNodeId(3)));
final Set<NodeIdentifier> disconnectingIds = coordinator.getNodeIdentifiers(NodeConnectionState.DISCONNECTING);
assertEquals(1, disconnectingIds.size());
assertTrue(disconnectingIds.contains(createNodeId(2)));
final Set<NodeIdentifier> disconnectedIds = coordinator.getNodeIdentifiers(NodeConnectionState.DISCONNECTED);
assertEquals(1, disconnectedIds.size());
assertTrue(disconnectedIds.contains(createNodeId(1)));
}
@Test(timeout = 5000)
public void testRequestNodeDisconnect() throws InterruptedException {
// Add a connected node
final NodeIdentifier nodeId1 = createNodeId(1);
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED));
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.CONNECTED));
// wait for the status change message and clear it
while (nodeStatuses.isEmpty()) {
Thread.sleep(10L);
}
nodeStatuses.clear();
coordinator.requestNodeDisconnect(nodeId1, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
assertEquals(NodeConnectionState.DISCONNECTED, coordinator.getConnectionStatus(nodeId1).getState());
while (nodeStatuses.isEmpty()) {
Thread.sleep(10L);
}
final NodeConnectionStatus status = nodeStatuses.get(0);
assertEquals(nodeId1, status.getNodeIdentifier());
assertEquals(NodeConnectionState.DISCONNECTED, status.getState());
}
@Test(timeout = 5000)
public void testCannotDisconnectLastNode() throws InterruptedException {
// Add a connected node
final NodeIdentifier nodeId1 = createNodeId(1);
final NodeIdentifier nodeId2 = createNodeId(2);
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED));
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED));
// wait for the status change message and clear it
while (nodeStatuses.isEmpty()) {
Thread.sleep(10L);
}
nodeStatuses.clear();
coordinator.requestNodeDisconnect(nodeId2, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
try {
coordinator.requestNodeDisconnect(nodeId1, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
Assert.fail("Expected an IllegalNodeDisconnectionException when trying to disconnect last node but it wasn't thrown");
} catch (final IllegalNodeDisconnectionException inde) {
// expected
}
// Should still be able to request that node 2 disconnect, since it's not the node that is connected
coordinator.requestNodeDisconnect(nodeId2, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
}
@Test(timeout = 5000)
public void testUpdateNodeStatusOutOfOrder() throws InterruptedException {
// Add a connected node
final NodeIdentifier nodeId1 = createNodeId(1);
final NodeIdentifier nodeId2 = createNodeId(2);
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED));
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED));
// wait for the status change message and clear it
while (nodeStatuses.size() < 2) {
Thread.sleep(10L);
}
nodeStatuses.clear();
final NodeConnectionStatus oldStatus = new NodeConnectionStatus(-1L, nodeId1, NodeConnectionState.DISCONNECTED,
null, DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L);
final NodeStatusChangeMessage msg = new NodeStatusChangeMessage();
msg.setNodeId(nodeId1);
msg.setNodeConnectionStatus(oldStatus);
coordinator.handle(msg, Collections.emptySet());
// Ensure that no status change message was send
Thread.sleep(1000);
assertTrue(nodeStatuses.isEmpty());
}
@Test
public void testProposedIdentifierResolvedIfConflict() {
final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 8000, "localhost", 9000, "localhost", 10000, 11000, false);
final NodeIdentifier conflictingId = new NodeIdentifier("1234", "localhost", 8001, "localhost", 9000, "localhost", 10000, 11000, false);
final ConnectionRequest connectionRequest = new ConnectionRequest(id1, new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>()));
final ConnectionRequestMessage crm = new ConnectionRequestMessage();
crm.setConnectionRequest(connectionRequest);
final ProtocolMessage response = coordinator.handle(crm, Collections.emptySet());
assertNotNull(response);
assertTrue(response instanceof ConnectionResponseMessage);
final ConnectionResponseMessage responseMessage = (ConnectionResponseMessage) response;
final NodeIdentifier resolvedNodeId = responseMessage.getConnectionResponse().getNodeIdentifier();
assertEquals(id1, resolvedNodeId);
final ConnectionRequest conRequest2 = new ConnectionRequest(conflictingId, new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>()));
final ConnectionRequestMessage crm2 = new ConnectionRequestMessage();
crm2.setConnectionRequest(conRequest2);
final ProtocolMessage conflictingResponse = coordinator.handle(crm2, Collections.emptySet());
assertNotNull(conflictingResponse);
assertTrue(conflictingResponse instanceof ConnectionResponseMessage);
final ConnectionResponseMessage conflictingResponseMessage = (ConnectionResponseMessage) conflictingResponse;
final NodeIdentifier conflictingNodeId = conflictingResponseMessage.getConnectionResponse().getNodeIdentifier();
assertEquals(id1.getId(), conflictingNodeId.getId());
assertEquals(conflictingId.getApiAddress(), conflictingNodeId.getApiAddress());
assertEquals(conflictingId.getApiPort(), conflictingNodeId.getApiPort());
assertEquals(conflictingId.getSiteToSiteAddress(), conflictingNodeId.getSiteToSiteAddress());
assertEquals(conflictingId.getSiteToSitePort(), conflictingNodeId.getSiteToSitePort());
assertEquals(conflictingId.getSocketAddress(), conflictingNodeId.getSocketAddress());
assertEquals(conflictingId.getSocketPort(), conflictingNodeId.getSocketPort());
}
@Test
public void testAddNodeIdentifierWithSameAddressDifferentLoadBalanceEndpoint() {
// Add Node 1 to the cluster
final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 8000, "localhost", 9000, "localhost", 10000, 11000, false);
final ConnectionRequest connectionRequest = new ConnectionRequest(id1, new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>()));
final ConnectionRequestMessage crm = new ConnectionRequestMessage();
crm.setConnectionRequest(connectionRequest);
final ProtocolMessage response = coordinator.handle(crm, Collections.emptySet());
assertNotNull(response);
assertTrue(response instanceof ConnectionResponseMessage);
final ConnectionResponseMessage responseMessage = (ConnectionResponseMessage) response;
final NodeIdentifier resolvedNodeId = responseMessage.getConnectionResponse().getNodeIdentifier();
assertEquals(id1, resolvedNodeId);
// Add in a conflicting ID
final NodeIdentifier conflictingId = new NodeIdentifier("1234", "localhost", 8001, "localhost", 9000, "loadbalance-2", 4848, "localhost", 10000, 11000, false, null);
final ConnectionRequest conRequest2 = new ConnectionRequest(conflictingId, new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>()));
final ConnectionRequestMessage crm2 = new ConnectionRequestMessage();
crm2.setConnectionRequest(conRequest2);
final ProtocolMessage conflictingResponse = coordinator.handle(crm2, Collections.emptySet());
assertNotNull(conflictingResponse);
assertTrue(conflictingResponse instanceof ConnectionResponseMessage);
final ConnectionResponseMessage conflictingResponseMessage = (ConnectionResponseMessage) conflictingResponse;
final NodeIdentifier conflictingNodeId = conflictingResponseMessage.getConnectionResponse().getNodeIdentifier();
assertEquals(id1.getId(), conflictingNodeId.getId());
assertEquals(conflictingId.getApiAddress(), conflictingNodeId.getApiAddress());
assertEquals(conflictingId.getApiPort(), conflictingNodeId.getApiPort());
assertEquals(conflictingId.getSiteToSiteAddress(), conflictingNodeId.getSiteToSiteAddress());
assertEquals(conflictingId.getSiteToSitePort(), conflictingNodeId.getSiteToSitePort());
assertEquals(conflictingId.getSocketAddress(), conflictingNodeId.getSocketAddress());
assertEquals(conflictingId.getSocketPort(), conflictingNodeId.getSocketPort());
// Ensure that the values were updated
final Set<NodeIdentifier> registeredNodeIds = coordinator.getNodeIdentifiers();
assertEquals(1, registeredNodeIds.size());
final NodeIdentifier registeredId = registeredNodeIds.iterator().next();
assertEquals("loadbalance-2", registeredId.getLoadBalanceAddress());
assertEquals(4848, registeredId.getLoadBalancePort());
}
private NodeIdentifier createNodeId(final int index) {
return new NodeIdentifier(String.valueOf(index), "localhost", 8000 + index, "localhost", 9000 + index, "localhost", 10000 + index, 11000 + index, false);
}
private ProtocolMessage requestConnection(final NodeIdentifier requestedNodeId, final NodeClusterCoordinator coordinator) {
final ConnectionRequest request = new ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>()));
final ConnectionRequestMessage requestMsg = new ConnectionRequestMessage();
requestMsg.setConnectionRequest(request);
return coordinator.handle(requestMsg, Collections.emptySet());
}
private static class FirstVoteWinsFlowElection implements FlowElection {
private DataFlow dataFlow;
private NodeIdentifier voter;
@Override
public boolean isElectionComplete() {
return dataFlow != null;
}
@Override
public synchronized DataFlow castVote(DataFlow candidate, NodeIdentifier nodeIdentifier) {
if (dataFlow == null) {
dataFlow = candidate;
voter = nodeIdentifier;
}
return dataFlow;
}
@Override
public DataFlow getElectedDataFlow() {
return dataFlow;
}
@Override
public String getStatusDescription() {
return "First Vote Wins";
}
@Override
public boolean isVoteCounted(NodeIdentifier nodeIdentifier) {
return voter != null && voter.equals(nodeIdentifier);
}
}
}