| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.controller.queue.clustered.server; |
| |
| import org.apache.nifi.connectable.Connection; |
| import org.apache.nifi.controller.FlowController; |
| import org.apache.nifi.controller.flow.FlowManager; |
| import org.apache.nifi.controller.queue.IllegalClusterStateException; |
| import org.apache.nifi.controller.queue.LoadBalanceCompression; |
| import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue; |
| import org.apache.nifi.controller.repository.ContentRepository; |
| import org.apache.nifi.controller.repository.FlowFileRecord; |
| import org.apache.nifi.controller.repository.FlowFileRepository; |
| import org.apache.nifi.controller.repository.RepositoryRecord; |
| import org.apache.nifi.controller.repository.claim.ContentClaim; |
| import org.apache.nifi.controller.repository.claim.ResourceClaim; |
| import org.apache.nifi.provenance.ProvenanceEventRecord; |
| import org.apache.nifi.provenance.ProvenanceEventType; |
| import org.apache.nifi.provenance.ProvenanceRepository; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataOutputStream; |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.PipedInputStream; |
| import java.io.PipedOutputStream; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.zip.CRC32; |
| import java.util.zip.CheckedOutputStream; |
| import java.util.zip.Checksum; |
| |
| import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.ABORT_TRANSACTION; |
| import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CHECK_SPACE; |
| import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.COMPLETE_TRANSACTION; |
| import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CONFIRM_CHECKSUM; |
| import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CONFIRM_COMPLETE_TRANSACTION; |
| import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS; |
| import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.MORE_FLOWFILES; |
| import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.NO_DATA_FRAME; |
| import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.NO_MORE_FLOWFILES; |
| import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.REJECT_CHECKSUM; |
| import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.SKIP_SPACE_CHECK; |
| import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.SPACE_AVAILABLE; |
| import static org.junit.Assert.assertArrayEquals; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.ArgumentMatchers.anyCollection; |
| import static org.mockito.ArgumentMatchers.anyList; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.when; |
| |
| public class TestStandardLoadBalanceProtocol { |
| private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = (sslSocket) -> sslSocket == null ? null : "authorized.mydomain.com"; |
| private FlowFileRepository flowFileRepo; |
| private ContentRepository contentRepo; |
| private ProvenanceRepository provenanceRepo; |
| private FlowController flowController; |
| private LoadBalancedFlowFileQueue flowFileQueue; |
| |
| private List<RepositoryRecord> flowFileRepoUpdateRecords; |
| private List<ProvenanceEventRecord> provRepoUpdateRecords; |
| private List<FlowFileRecord> flowFileQueuePutRecords; |
| private List<FlowFileRecord> flowFileQueueReceiveRecords; |
| |
| private ConcurrentMap<ContentClaim, byte[]> claimContents; |
| |
| |
| @Before |
| public void setup() throws IOException, IllegalClusterStateException { |
| flowFileQueuePutRecords = new ArrayList<>(); |
| flowFileQueueReceiveRecords = new ArrayList<>(); |
| flowFileRepoUpdateRecords = new ArrayList<>(); |
| provRepoUpdateRecords = new ArrayList<>(); |
| |
| flowFileRepo = Mockito.mock(FlowFileRepository.class); |
| contentRepo = Mockito.mock(ContentRepository.class); |
| provenanceRepo = Mockito.mock(ProvenanceRepository.class); |
| flowController = Mockito.mock(FlowController.class); |
| claimContents = new ConcurrentHashMap<>(); |
| |
| Mockito.doAnswer(new Answer<ContentClaim>() { |
| @Override |
| public ContentClaim answer(final InvocationOnMock invocation) throws Throwable { |
| final ContentClaim contentClaim = Mockito.mock(ContentClaim.class); |
| final ResourceClaim resourceClaim = Mockito.mock(ResourceClaim.class); |
| when(contentClaim.getResourceClaim()).thenReturn(resourceClaim); |
| return contentClaim; |
| } |
| }).when(contentRepo).create(Mockito.anyBoolean()); |
| |
| Mockito.doAnswer(new Answer<OutputStream>() { |
| @Override |
| public OutputStream answer(final InvocationOnMock invocation) throws Throwable { |
| final ContentClaim contentClaim = invocation.getArgument(0); |
| |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream() { |
| @Override |
| public void close() throws IOException { |
| super.close(); |
| claimContents.put(contentClaim, toByteArray()); |
| } |
| }; |
| |
| return baos; |
| } |
| }).when(contentRepo).write(Mockito.any(ContentClaim.class)); |
| |
| final Connection connection = Mockito.mock(Connection.class); |
| final FlowManager flowManager = Mockito.mock(FlowManager.class); |
| when(flowManager.getConnection(Mockito.anyString())).thenReturn(connection); |
| when(flowController.getFlowManager()).thenReturn(flowManager); |
| |
| flowFileQueue = Mockito.mock(LoadBalancedFlowFileQueue.class); |
| when(flowFileQueue.getLoadBalanceCompression()).thenReturn(LoadBalanceCompression.DO_NOT_COMPRESS); |
| when(connection.getFlowFileQueue()).thenReturn(flowFileQueue); |
| |
| Mockito.doAnswer(new Answer<Void>() { |
| @Override |
| public Void answer(final InvocationOnMock invocation) throws Throwable { |
| flowFileQueuePutRecords.addAll(invocation.getArgument(0)); |
| return null; |
| } |
| }).when(flowFileQueue).putAll(anyCollection()); |
| |
| Mockito.doAnswer(new Answer<Void>() { |
| @Override |
| public Void answer(final InvocationOnMock invocation) throws Throwable { |
| flowFileQueueReceiveRecords.addAll(invocation.getArgument(0)); |
| return null; |
| } |
| }).when(flowFileQueue).receiveFromPeer(anyCollection()); |
| |
| Mockito.doAnswer(new Answer<Void>() { |
| @Override |
| public Void answer(final InvocationOnMock invocation) throws Throwable { |
| flowFileRepoUpdateRecords.addAll(invocation.getArgument(0)); |
| return null; |
| } |
| }).when(flowFileRepo).updateRepository(anyCollection()); |
| |
| Mockito.doAnswer(new Answer<Void>() { |
| @Override |
| public Void answer(final InvocationOnMock invocation) throws Throwable { |
| provRepoUpdateRecords.addAll(invocation.getArgument(0)); |
| return null; |
| } |
| }).when(provenanceRepo).registerEvents(anyCollection()); |
| } |
| |
| |
| @Test |
| public void testSimpleFlowFileTransaction() throws IOException, IllegalClusterStateException { |
| final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); |
| |
| final PipedInputStream serverInput = new PipedInputStream(); |
| final PipedOutputStream serverContentSource = new PipedOutputStream(); |
| serverInput.connect(serverContentSource); |
| |
| final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream(); |
| |
| // Write connection ID |
| final Checksum checksum = new CRC32(); |
| final OutputStream checkedOutput = new CheckedOutputStream(serverContentSource, checksum); |
| final DataOutputStream dos = new DataOutputStream(checkedOutput); |
| dos.writeUTF("unit-test-connection-id"); |
| |
| final Map<String, String> attributes = new HashMap<>(); |
| attributes.put("a", "A"); |
| attributes.put("uuid", "unit-test-id"); |
| attributes.put("b", "B"); |
| |
| dos.write(CHECK_SPACE); |
| dos.write(MORE_FLOWFILES); |
| writeAttributes(attributes, dos); |
| writeContent("hello".getBytes(), dos); |
| dos.write(NO_MORE_FLOWFILES); |
| |
| dos.writeLong(checksum.getValue()); |
| dos.write(COMPLETE_TRANSACTION); |
| |
| protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1); |
| |
| final byte[] serverResponse = serverOutput.toByteArray(); |
| assertEquals(3, serverResponse.length); |
| assertEquals(SPACE_AVAILABLE, serverResponse[0]); |
| assertEquals(CONFIRM_CHECKSUM, serverResponse[1]); |
| assertEquals(CONFIRM_COMPLETE_TRANSACTION, serverResponse[2]); |
| |
| assertEquals(1, claimContents.size()); |
| final byte[] firstFlowFileContent = claimContents.values().iterator().next(); |
| assertArrayEquals("hello".getBytes(), firstFlowFileContent); |
| |
| Mockito.verify(flowFileRepo, times(1)).updateRepository(anyCollection()); |
| Mockito.verify(provenanceRepo, times(1)).registerEvents(anyList()); |
| Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection()); |
| Mockito.verify(flowFileQueue, times(1)).receiveFromPeer(anyCollection()); |
| } |
| |
| @Test |
| public void testMultipleFlowFiles() throws IOException { |
| final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); |
| |
| final PipedInputStream serverInput = new PipedInputStream(); |
| final PipedOutputStream serverContentSource = new PipedOutputStream(); |
| serverInput.connect(serverContentSource); |
| |
| final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream(); |
| |
| // Write connection ID |
| final Checksum checksum = new CRC32(); |
| final OutputStream checkedOutput = new CheckedOutputStream(serverContentSource, checksum); |
| final DataOutputStream dos = new DataOutputStream(checkedOutput); |
| dos.writeUTF("unit-test-connection-id"); |
| |
| final Map<String, String> attributes = new HashMap<>(); |
| attributes.put("a", "A"); |
| attributes.put("uuid", "unit-test-id"); |
| attributes.put("b", "B"); |
| |
| // Send 4 FlowFiles. |
| dos.write(CHECK_SPACE); |
| dos.write(MORE_FLOWFILES); |
| writeAttributes(attributes, dos); |
| writeContent("hello".getBytes(), dos); |
| |
| dos.write(MORE_FLOWFILES); |
| writeAttributes(Collections.singletonMap("uuid", "unit-test-id-2"), dos); |
| writeContent(null, dos); |
| |
| dos.write(MORE_FLOWFILES); |
| writeAttributes(Collections.singletonMap("uuid", "unit-test-id-3"), dos); |
| writeContent("greetings".getBytes(), dos); |
| |
| dos.write(MORE_FLOWFILES); |
| writeAttributes(Collections.singletonMap("uuid", "unit-test-id-4"), dos); |
| writeContent(new byte[0], dos); |
| |
| dos.write(NO_MORE_FLOWFILES); |
| |
| dos.writeLong(checksum.getValue()); |
| dos.write(COMPLETE_TRANSACTION); |
| |
| protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1); |
| |
| final byte[] serverResponse = serverOutput.toByteArray(); |
| assertEquals(3, serverResponse.length); |
| assertEquals(SPACE_AVAILABLE, serverResponse[0]); |
| assertEquals(CONFIRM_CHECKSUM, serverResponse[1]); |
| assertEquals(CONFIRM_COMPLETE_TRANSACTION, serverResponse[2]); |
| |
| assertEquals(1, claimContents.size()); |
| final byte[] bytes = claimContents.values().iterator().next(); |
| assertTrue(Arrays.equals("hellogreetings".getBytes(), bytes) || Arrays.equals("greetingshello".getBytes(), bytes)); |
| |
| assertEquals(4, flowFileRepoUpdateRecords.size()); |
| assertEquals(4, provRepoUpdateRecords.size()); |
| assertEquals(0, flowFileQueuePutRecords.size()); |
| assertEquals(4, flowFileQueueReceiveRecords.size()); |
| |
| assertTrue(provRepoUpdateRecords.stream().allMatch(event -> event.getEventType() == ProvenanceEventType.RECEIVE)); |
| } |
| |
| |
| @Test |
| public void testMultipleFlowFilesWithoutCheckingSpace() throws IOException { |
| final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); |
| |
| final PipedInputStream serverInput = new PipedInputStream(); |
| final PipedOutputStream serverContentSource = new PipedOutputStream(); |
| serverInput.connect(serverContentSource); |
| |
| final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream(); |
| |
| // Write connection ID |
| final Checksum checksum = new CRC32(); |
| final OutputStream checkedOutput = new CheckedOutputStream(serverContentSource, checksum); |
| final DataOutputStream dos = new DataOutputStream(checkedOutput); |
| dos.writeUTF("unit-test-connection-id"); |
| |
| final Map<String, String> attributes = new HashMap<>(); |
| attributes.put("a", "A"); |
| attributes.put("uuid", "unit-test-id"); |
| attributes.put("b", "B"); |
| |
| // Send 4 FlowFiles. |
| dos.write(SKIP_SPACE_CHECK); |
| dos.write(MORE_FLOWFILES); |
| writeAttributes(attributes, dos); |
| writeContent("hello".getBytes(), dos); |
| |
| dos.write(MORE_FLOWFILES); |
| writeAttributes(Collections.singletonMap("uuid", "unit-test-id-2"), dos); |
| writeContent(null, dos); |
| |
| dos.write(MORE_FLOWFILES); |
| writeAttributes(Collections.singletonMap("uuid", "unit-test-id-3"), dos); |
| writeContent("greetings".getBytes(), dos); |
| |
| dos.write(MORE_FLOWFILES); |
| writeAttributes(Collections.singletonMap("uuid", "unit-test-id-4"), dos); |
| writeContent(new byte[0], dos); |
| |
| dos.write(NO_MORE_FLOWFILES); |
| |
| dos.writeLong(checksum.getValue()); |
| dos.write(COMPLETE_TRANSACTION); |
| |
| protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1); |
| |
| final byte[] serverResponse = serverOutput.toByteArray(); |
| assertEquals(2, serverResponse.length); |
| assertEquals(CONFIRM_CHECKSUM, serverResponse[0]); |
| assertEquals(CONFIRM_COMPLETE_TRANSACTION, serverResponse[1]); |
| |
| assertEquals(1, claimContents.size()); |
| final byte[] bytes = claimContents.values().iterator().next(); |
| assertTrue(Arrays.equals("hellogreetings".getBytes(), bytes) || Arrays.equals("greetingshello".getBytes(), bytes)); |
| |
| assertEquals(4, flowFileRepoUpdateRecords.size()); |
| assertEquals(4, provRepoUpdateRecords.size()); |
| assertEquals(0, flowFileQueuePutRecords.size()); |
| assertEquals(4, flowFileQueueReceiveRecords.size()); |
| |
| assertTrue(provRepoUpdateRecords.stream().allMatch(event -> event.getEventType() == ProvenanceEventType.RECEIVE)); |
| } |
| |
| @Test |
| public void testEofExceptionMultipleFlowFiles() throws IOException { |
| final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); |
| |
| final PipedInputStream serverInput = new PipedInputStream(); |
| final PipedOutputStream serverContentSource = new PipedOutputStream(); |
| serverInput.connect(serverContentSource); |
| |
| final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream(); |
| |
| // Write connection ID |
| final Checksum checksum = new CRC32(); |
| final OutputStream checkedOutput = new CheckedOutputStream(serverContentSource, checksum); |
| final DataOutputStream dos = new DataOutputStream(checkedOutput); |
| dos.writeUTF("unit-test-connection-id"); |
| |
| final Map<String, String> attributes = new HashMap<>(); |
| attributes.put("a", "A"); |
| attributes.put("uuid", "unit-test-id"); |
| attributes.put("b", "B"); |
| |
| // Send 4 FlowFiles. |
| dos.write(CHECK_SPACE); |
| dos.write(MORE_FLOWFILES); |
| writeAttributes(attributes, dos); |
| writeContent("hello".getBytes(), dos); |
| |
| dos.write(MORE_FLOWFILES); |
| writeAttributes(Collections.singletonMap("uuid", "unit-test-id-2"), dos); |
| writeContent(null, dos); |
| |
| dos.write(MORE_FLOWFILES); |
| writeAttributes(Collections.singletonMap("uuid", "unit-test-id-3"), dos); |
| writeContent("greetings".getBytes(), dos); |
| |
| dos.write(MORE_FLOWFILES); |
| writeAttributes(Collections.singletonMap("uuid", "unit-test-id-4"), dos); |
| writeContent(new byte[0], dos); |
| |
| dos.flush(); |
| dos.close(); |
| |
| try { |
| protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1); |
| Assert.fail("Expected EOFException but none was thrown"); |
| } catch (final EOFException eof) { |
| // expected |
| } |
| |
| final byte[] serverResponse = serverOutput.toByteArray(); |
| assertEquals(1, serverResponse.length); |
| assertEquals(SPACE_AVAILABLE, serverResponse[0]); |
| |
| assertEquals(1, claimContents.size()); |
| assertArrayEquals("hellogreetings".getBytes(), claimContents.values().iterator().next()); |
| |
| assertEquals(0, flowFileRepoUpdateRecords.size()); |
| assertEquals(0, provRepoUpdateRecords.size()); |
| assertEquals(0, flowFileQueuePutRecords.size()); |
| } |
| |
| @Test |
| public void testBadChecksum() throws IOException { |
| final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); |
| |
| final PipedInputStream serverInput = new PipedInputStream(); |
| final PipedOutputStream serverContentSource = new PipedOutputStream(); |
| serverInput.connect(serverContentSource); |
| |
| final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream(); |
| |
| // Write connection ID |
| final Checksum checksum = new CRC32(); |
| final OutputStream checkedOutput = new CheckedOutputStream(serverContentSource, checksum); |
| final DataOutputStream dos = new DataOutputStream(checkedOutput); |
| dos.writeUTF("unit-test-connection-id"); |
| |
| final Map<String, String> attributes = new HashMap<>(); |
| attributes.put("uuid", "unit-test-id"); |
| |
| dos.write(CHECK_SPACE); |
| dos.write(MORE_FLOWFILES); |
| writeAttributes(attributes, dos); |
| writeContent("hello".getBytes(), dos); |
| dos.write(NO_MORE_FLOWFILES); |
| |
| dos.writeLong(1L); // Write bad checksum. |
| dos.write(COMPLETE_TRANSACTION); |
| |
| try { |
| protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1); |
| Assert.fail("Expected TransactionAbortedException but none was thrown"); |
| } catch (final TransactionAbortedException e) { |
| // expected |
| } |
| |
| final byte[] serverResponse = serverOutput.toByteArray(); |
| assertEquals(2, serverResponse.length); |
| assertEquals(SPACE_AVAILABLE, serverResponse[0]); |
| assertEquals(REJECT_CHECKSUM, serverResponse[1]); |
| |
| assertEquals(1, claimContents.size()); |
| final byte[] firstFlowFileContent = claimContents.values().iterator().next(); |
| assertArrayEquals("hello".getBytes(), firstFlowFileContent); |
| |
| Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection()); |
| Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList()); |
| Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection()); |
| Mockito.verify(contentRepo, times(1)).incrementClaimaintCount(claimContents.keySet().iterator().next()); |
| Mockito.verify(contentRepo, times(2)).decrementClaimantCount(claimContents.keySet().iterator().next()); |
| Mockito.verify(contentRepo, times(1)).remove(claimContents.keySet().iterator().next()); |
| } |
| |
| @Test |
| public void testEofWritingContent() throws IOException { |
| final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); |
| |
| final PipedInputStream serverInput = new PipedInputStream(); |
| final PipedOutputStream serverContentSource = new PipedOutputStream(); |
| serverInput.connect(serverContentSource); |
| |
| final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream(); |
| |
| // Write connection ID |
| final Checksum checksum = new CRC32(); |
| final OutputStream checkedOutput = new CheckedOutputStream(serverContentSource, checksum); |
| final DataOutputStream dos = new DataOutputStream(checkedOutput); |
| dos.writeUTF("unit-test-connection-id"); |
| |
| final Map<String, String> attributes = new HashMap<>(); |
| attributes.put("uuid", "unit-test-id"); |
| |
| dos.write(CHECK_SPACE); |
| dos.write(MORE_FLOWFILES); |
| writeAttributes(attributes, dos); |
| |
| // Indicate 45 byte data frame, then stop after 5 bytes. |
| dos.write(DATA_FRAME_FOLLOWS); |
| dos.writeShort(45); |
| dos.write("hello".getBytes()); |
| dos.flush(); |
| dos.close(); |
| |
| try { |
| protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1); |
| Assert.fail("Expected EOFException but none was thrown"); |
| } catch (final EOFException e) { |
| // expected |
| } |
| |
| final byte[] serverResponse = serverOutput.toByteArray(); |
| assertEquals(1, serverResponse.length); |
| assertEquals(SPACE_AVAILABLE, serverResponse[0]); |
| |
| assertEquals(1, claimContents.size()); |
| final byte[] firstFlowFileContent = claimContents.values().iterator().next(); |
| assertArrayEquals(new byte[0], firstFlowFileContent); |
| |
| Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection()); |
| Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList()); |
| Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection()); |
| Mockito.verify(contentRepo, times(0)).incrementClaimaintCount(claimContents.keySet().iterator().next()); |
| Mockito.verify(contentRepo, times(0)).decrementClaimantCount(claimContents.keySet().iterator().next()); |
| Mockito.verify(contentRepo, times(1)).remove(claimContents.keySet().iterator().next()); |
| } |
| |
| @Test |
| public void testAbortAfterChecksumConfirmation() throws IOException { |
| final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); |
| |
| final PipedInputStream serverInput = new PipedInputStream(); |
| final PipedOutputStream serverContentSource = new PipedOutputStream(); |
| serverInput.connect(serverContentSource); |
| |
| final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream(); |
| |
| // Write connection ID |
| final Checksum checksum = new CRC32(); |
| final OutputStream checkedOutput = new CheckedOutputStream(serverContentSource, checksum); |
| final DataOutputStream dos = new DataOutputStream(checkedOutput); |
| dos.writeUTF("unit-test-connection-id"); |
| |
| final Map<String, String> attributes = new HashMap<>(); |
| attributes.put("uuid", "unit-test-id"); |
| |
| dos.write(CHECK_SPACE); |
| dos.write(MORE_FLOWFILES); |
| writeAttributes(attributes, dos); |
| writeContent("hello".getBytes(), dos); |
| dos.write(NO_MORE_FLOWFILES); |
| |
| dos.writeLong(checksum.getValue()); |
| dos.write(ABORT_TRANSACTION); |
| |
| try { |
| protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1); |
| Assert.fail("Expected TransactionAbortedException but none was thrown"); |
| } catch (final TransactionAbortedException e) { |
| // expected |
| } |
| |
| final byte[] serverResponse = serverOutput.toByteArray(); |
| assertEquals(2, serverResponse.length); |
| assertEquals(SPACE_AVAILABLE, serverResponse[0]); |
| assertEquals(CONFIRM_CHECKSUM, serverResponse[1]); |
| |
| assertEquals(1, claimContents.size()); |
| final byte[] firstFlowFileContent = claimContents.values().iterator().next(); |
| assertArrayEquals("hello".getBytes(), firstFlowFileContent); |
| |
| Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection()); |
| Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList()); |
| Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection()); |
| Mockito.verify(contentRepo, times(1)).incrementClaimaintCount(claimContents.keySet().iterator().next()); |
| Mockito.verify(contentRepo, times(2)).decrementClaimantCount(claimContents.keySet().iterator().next()); |
| Mockito.verify(contentRepo, times(1)).remove(claimContents.keySet().iterator().next()); |
| } |
| |
| @Test |
| public void testFlowFileNoContent() throws IOException, IllegalClusterStateException { |
| final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); |
| |
| final PipedInputStream serverInput = new PipedInputStream(); |
| final PipedOutputStream serverContentSource = new PipedOutputStream(); |
| serverInput.connect(serverContentSource); |
| |
| final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream(); |
| |
| // Write connection ID |
| final Checksum checksum = new CRC32(); |
| final OutputStream checkedOutput = new CheckedOutputStream(serverContentSource, checksum); |
| final DataOutputStream dos = new DataOutputStream(checkedOutput); |
| dos.writeUTF("unit-test-connection-id"); |
| |
| final Map<String, String> attributes = new HashMap<>(); |
| attributes.put("uuid", "unit-test-id"); |
| |
| dos.write(CHECK_SPACE); |
| dos.write(MORE_FLOWFILES); |
| writeAttributes(attributes, dos); |
| writeContent(null, dos); |
| dos.write(NO_MORE_FLOWFILES); |
| |
| dos.writeLong(checksum.getValue()); |
| dos.write(COMPLETE_TRANSACTION); |
| |
| protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1); |
| |
| final byte[] serverResponse = serverOutput.toByteArray(); |
| assertEquals(3, serverResponse.length); |
| assertEquals(SPACE_AVAILABLE, serverResponse[0]); |
| assertEquals(CONFIRM_CHECKSUM, serverResponse[1]); |
| assertEquals(CONFIRM_COMPLETE_TRANSACTION, serverResponse[2]); |
| |
| assertEquals(1, claimContents.size()); |
| assertEquals(0, claimContents.values().iterator().next().length); |
| |
| Mockito.verify(flowFileRepo, times(1)).updateRepository(anyCollection()); |
| Mockito.verify(provenanceRepo, times(1)).registerEvents(anyList()); |
| Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection()); |
| Mockito.verify(flowFileQueue, times(1)).receiveFromPeer(anyCollection()); |
| } |
| |
| private void writeAttributes(final Map<String, String> attributes, final DataOutputStream dos) throws IOException { |
| try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| final DataOutputStream out = new DataOutputStream(baos)) { |
| out.writeInt(attributes.size()); |
| |
| for (final Map.Entry<String, String> entry : attributes.entrySet()) { |
| final byte[] key = entry.getKey().getBytes(); |
| out.writeInt(key.length); |
| out.write(key); |
| |
| final byte[] value = entry.getValue().getBytes(); |
| out.writeInt(value.length); |
| out.write(value); |
| } |
| |
| out.writeLong(0L); // lineage start date |
| out.writeLong(0L); // entry date |
| out.writeLong(0L); // penalty expiration time |
| |
| dos.writeInt(baos.size()); |
| baos.writeTo(dos); |
| } |
| |
| } |
| |
| private void writeContent(final byte[] content, final DataOutputStream out) throws IOException { |
| if (content == null) { |
| out.write(NO_DATA_FRAME); |
| return; |
| } |
| |
| int iterations = content.length / 65535; |
| if (content.length % 65535 > 0) { |
| iterations++; |
| } |
| |
| for (int i=0; i < iterations; i++) { |
| final int offset = i * 65536; |
| final int length = Math.min(content.length - offset, 65535); |
| |
| out.write(DATA_FRAME_FOLLOWS); |
| out.writeInt(length); |
| out.write(content, offset, length); |
| } |
| |
| out.write(NO_DATA_FRAME); |
| } |
| } |