| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.controller; |
| |
| import org.apache.nifi.connectable.Connectable; |
| import org.apache.nifi.connectable.Connection; |
| import org.apache.nifi.controller.queue.DropFlowFileState; |
| import org.apache.nifi.controller.queue.DropFlowFileStatus; |
| import org.apache.nifi.controller.queue.ListFlowFileState; |
| import org.apache.nifi.controller.queue.ListFlowFileStatus; |
| import org.apache.nifi.controller.queue.NopConnectionEventListener; |
| import org.apache.nifi.controller.queue.QueueSize; |
| import org.apache.nifi.controller.queue.StandardFlowFileQueue; |
| import org.apache.nifi.controller.repository.FlowFileRecord; |
| import org.apache.nifi.controller.repository.FlowFileRepository; |
| import org.apache.nifi.controller.repository.claim.ResourceClaimManager; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.flowfile.FlowFilePrioritizer; |
| import org.apache.nifi.processor.FlowFileFilter; |
| import org.apache.nifi.provenance.ProvenanceEventRecord; |
| import org.apache.nifi.provenance.ProvenanceEventRepository; |
| import org.apache.nifi.provenance.ProvenanceEventType; |
| import org.apache.nifi.provenance.StandardProvenanceEventRecord; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.UUID; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| |
| public class TestStandardFlowFileQueue { |
| private MockSwapManager swapManager = null; |
| private StandardFlowFileQueue queue = null; |
| |
| private Connection connection = null; |
| private FlowFileRepository flowFileRepo = null; |
| private ProvenanceEventRepository provRepo = null; |
| private ResourceClaimManager claimManager = null; |
| private ProcessScheduler scheduler = null; |
| |
| private List<ProvenanceEventRecord> provRecords = new ArrayList<>(); |
| |
| @BeforeClass |
| public static void setupLogging() { |
| System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "DEBUG"); |
| } |
| |
| @Before |
| @SuppressWarnings("unchecked") |
| public void setup() { |
| provRecords.clear(); |
| |
| connection = Mockito.mock(Connection.class); |
| Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class)); |
| Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class)); |
| |
| scheduler = Mockito.mock(ProcessScheduler.class); |
| swapManager = new MockSwapManager(); |
| |
| flowFileRepo = Mockito.mock(FlowFileRepository.class); |
| provRepo = Mockito.mock(ProvenanceEventRepository.class); |
| claimManager = Mockito.mock(ResourceClaimManager.class); |
| |
| Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder()); |
| Mockito.doAnswer(new Answer<Object>() { |
| @Override |
| public Object answer(final InvocationOnMock invocation) throws Throwable { |
| final Iterable<ProvenanceEventRecord> iterable = (Iterable<ProvenanceEventRecord>) invocation.getArguments()[0]; |
| for (final ProvenanceEventRecord record : iterable) { |
| provRecords.add(record); |
| } |
| return null; |
| } |
| }).when(provRepo).registerEvents(Mockito.any(Iterable.class)); |
| |
| queue = new StandardFlowFileQueue("id", new NopConnectionEventListener(), flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, "0 sec", 0L, "0 B"); |
| MockFlowFileRecord.resetIdGenerator(); |
| } |
| |
| @Test |
| public void testExpire() { |
| queue.setFlowFileExpiration("1 ms"); |
| |
| for (int i = 0; i < 100; i++) { |
| queue.put(new MockFlowFileRecord()); |
| } |
| |
| // just make sure that the flowfiles have time to expire. |
| try { |
| Thread.sleep(100L); |
| } catch (final InterruptedException ie) { |
| } |
| |
| final Set<FlowFileRecord> expiredRecords = new HashSet<>(100); |
| final FlowFileRecord pulled = queue.poll(expiredRecords); |
| |
| assertNull(pulled); |
| assertEquals(100, expiredRecords.size()); |
| |
| final QueueSize activeSize = queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize(); |
| assertEquals(0, activeSize.getObjectCount()); |
| assertEquals(0L, activeSize.getByteCount()); |
| |
| final QueueSize unackSize = queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize(); |
| assertEquals(0, unackSize.getObjectCount()); |
| assertEquals(0L, unackSize.getByteCount()); |
| } |
| |
| @Test |
| public void testBackPressure() { |
| queue.setBackPressureObjectThreshold(10); |
| |
| assertTrue(queue.isEmpty()); |
| assertTrue(queue.isActiveQueueEmpty()); |
| assertFalse(queue.isFull()); |
| |
| for (int i = 0; i < 9; i++) { |
| queue.put(new MockFlowFileRecord()); |
| assertFalse(queue.isFull()); |
| assertFalse(queue.isEmpty()); |
| assertFalse(queue.isActiveQueueEmpty()); |
| } |
| |
| queue.put(new MockFlowFileRecord()); |
| assertTrue(queue.isFull()); |
| assertFalse(queue.isEmpty()); |
| assertFalse(queue.isActiveQueueEmpty()); |
| |
| final Set<FlowFileRecord> expiredRecords = new HashSet<>(); |
| final FlowFileRecord polled = queue.poll(expiredRecords); |
| assertNotNull(polled); |
| assertTrue(expiredRecords.isEmpty()); |
| |
| assertFalse(queue.isEmpty()); |
| assertFalse(queue.isActiveQueueEmpty()); |
| |
| // queue is still full because FlowFile has not yet been acknowledged. |
| assertTrue(queue.isFull()); |
| queue.acknowledge(polled); |
| |
| // FlowFile has been acknowledged; queue should no longer be full. |
| assertFalse(queue.isFull()); |
| assertFalse(queue.isEmpty()); |
| assertFalse(queue.isActiveQueueEmpty()); |
| } |
| |
| @Test |
| public void testBackPressureAfterPollFilter() throws InterruptedException { |
| queue.setBackPressureObjectThreshold(10); |
| queue.setFlowFileExpiration("10 millis"); |
| |
| for (int i = 0; i < 9; i++) { |
| queue.put(new MockFlowFileRecord()); |
| assertFalse(queue.isFull()); |
| } |
| |
| queue.put(new MockFlowFileRecord()); |
| assertTrue(queue.isFull()); |
| |
| Thread.sleep(100L); |
| |
| |
| final FlowFileFilter filter = new FlowFileFilter() { |
| @Override |
| public FlowFileFilterResult filter(final FlowFile flowFile) { |
| return FlowFileFilterResult.REJECT_AND_CONTINUE; |
| } |
| }; |
| |
| final Set<FlowFileRecord> expiredRecords = new HashSet<>(); |
| final List<FlowFileRecord> polled = queue.poll(filter, expiredRecords); |
| assertTrue(polled.isEmpty()); |
| assertEquals(10, expiredRecords.size()); |
| |
| assertFalse(queue.isFull()); |
| assertTrue(queue.isEmpty()); |
| assertTrue(queue.isActiveQueueEmpty()); |
| } |
| |
| @Test(timeout = 10000) |
| public void testBackPressureAfterDrop() throws InterruptedException { |
| queue.setBackPressureObjectThreshold(10); |
| queue.setFlowFileExpiration("10 millis"); |
| |
| for (int i = 0; i < 9; i++) { |
| queue.put(new MockFlowFileRecord()); |
| assertFalse(queue.isFull()); |
| } |
| |
| queue.put(new MockFlowFileRecord()); |
| assertTrue(queue.isFull()); |
| |
| Thread.sleep(100L); |
| |
| final String requestId = UUID.randomUUID().toString(); |
| final DropFlowFileStatus status = queue.dropFlowFiles(requestId, "Unit Test"); |
| |
| while (status.getState() != DropFlowFileState.COMPLETE) { |
| Thread.sleep(10L); |
| } |
| |
| assertFalse(queue.isFull()); |
| assertTrue(queue.isEmpty()); |
| assertTrue(queue.isActiveQueueEmpty()); |
| |
| assertEquals(10, provRecords.size()); |
| for (final ProvenanceEventRecord event : provRecords) { |
| assertNotNull(event); |
| assertEquals(ProvenanceEventType.DROP, event.getEventType()); |
| } |
| } |
| |
| @Test |
| public void testBackPressureAfterPollSingle() throws InterruptedException { |
| queue.setBackPressureObjectThreshold(10); |
| queue.setFlowFileExpiration("10 millis"); |
| |
| for (int i = 0; i < 9; i++) { |
| queue.put(new MockFlowFileRecord()); |
| assertFalse(queue.isFull()); |
| } |
| |
| queue.put(new MockFlowFileRecord()); |
| assertTrue(queue.isFull()); |
| |
| Thread.sleep(100L); |
| |
| final Set<FlowFileRecord> expiredRecords = new HashSet<>(); |
| final FlowFileRecord polled = queue.poll(expiredRecords); |
| assertNull(polled); |
| assertEquals(10, expiredRecords.size()); |
| |
| assertFalse(queue.isFull()); |
| assertTrue(queue.isEmpty()); |
| assertTrue(queue.isActiveQueueEmpty()); |
| } |
| |
| @Test |
| public void testBackPressureAfterPollMultiple() throws InterruptedException { |
| queue.setBackPressureObjectThreshold(10); |
| queue.setFlowFileExpiration("10 millis"); |
| |
| for (int i = 0; i < 9; i++) { |
| queue.put(new MockFlowFileRecord()); |
| assertFalse(queue.isFull()); |
| } |
| |
| queue.put(new MockFlowFileRecord()); |
| assertTrue(queue.isFull()); |
| |
| Thread.sleep(100L); |
| |
| final Set<FlowFileRecord> expiredRecords = new HashSet<>(); |
| final List<FlowFileRecord> polled = queue.poll(10, expiredRecords); |
| assertTrue(polled.isEmpty()); |
| assertEquals(10, expiredRecords.size()); |
| |
| assertFalse(queue.isFull()); |
| assertTrue(queue.isEmpty()); |
| assertTrue(queue.isActiveQueueEmpty()); |
| } |
| |
| @Test |
| public void testSwapOutOccurs() { |
| for (int i = 0; i < 10000; i++) { |
| queue.put(new MockFlowFileRecord()); |
| assertEquals(0, swapManager.swapOutCalledCount); |
| assertEquals(i + 1, queue.size().getObjectCount()); |
| assertEquals(i + 1, queue.size().getByteCount()); |
| } |
| |
| for (int i = 0; i < 9999; i++) { |
| queue.put(new MockFlowFileRecord()); |
| assertEquals(0, swapManager.swapOutCalledCount); |
| assertEquals(i + 10001, queue.size().getObjectCount()); |
| assertEquals(i + 10001, queue.size().getByteCount()); |
| } |
| |
| queue.put(new MockFlowFileRecord(1000)); |
| assertEquals(1, swapManager.swapOutCalledCount); |
| assertEquals(20000, queue.size().getObjectCount()); |
| assertEquals(20999, queue.size().getByteCount()); |
| |
| assertEquals(10000, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount()); |
| } |
| |
| @Test |
| public void testSwapIn() { |
| for (int i = 1; i <= 20000; i++) { |
| queue.put(new MockFlowFileRecord()); |
| } |
| |
| assertEquals(1, swapManager.swappedOut.size()); |
| queue.put(new MockFlowFileRecord()); |
| assertEquals(1, swapManager.swappedOut.size()); |
| |
| final Set<FlowFileRecord> exp = new HashSet<>(); |
| for (int i = 0; i < 9999; i++) { |
| final FlowFileRecord flowFile = queue.poll(exp); |
| assertNotNull(flowFile); |
| assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getObjectCount()); |
| assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getByteCount()); |
| |
| queue.acknowledge(Collections.singleton(flowFile)); |
| assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getObjectCount()); |
| assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getByteCount()); |
| } |
| |
| assertEquals(0, swapManager.swapInCalledCount); |
| assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount()); |
| assertNotNull(queue.poll(exp)); |
| |
| assertEquals(0, swapManager.swapInCalledCount); |
| assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount()); |
| |
| assertEquals(1, swapManager.swapOutCalledCount); |
| |
| assertNotNull(queue.poll(exp)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top. |
| assertEquals(1, swapManager.swapInCalledCount); |
| assertEquals(9999, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount()); |
| |
| assertTrue(swapManager.swappedOut.isEmpty()); |
| |
| queue.poll(exp); |
| } |
| |
| @Test |
| public void testSwapInWhenThresholdIsLessThanSwapSize() { |
| // create a queue where the swap threshold is less than 10k |
| queue = new StandardFlowFileQueue("id", new NopConnectionEventListener(), flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000, "0 sec", 0L, "0 B"); |
| |
| for (int i = 1; i <= 20000; i++) { |
| queue.put(new MockFlowFileRecord()); |
| } |
| |
| assertEquals(1, swapManager.swappedOut.size()); |
| queue.put(new MockFlowFileRecord()); |
| assertEquals(1, swapManager.swappedOut.size()); |
| |
| final Set<FlowFileRecord> exp = new HashSet<>(); |
| |
| // At this point there should be: |
| // 1k flow files in the active queue |
| // 9,001 flow files in the swap queue |
| // 10k flow files swapped to disk |
| |
| for (int i = 0; i < 999; i++) { // |
| final FlowFileRecord flowFile = queue.poll(exp); |
| assertNotNull(flowFile); |
| assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getObjectCount()); |
| assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getByteCount()); |
| |
| queue.acknowledge(Collections.singleton(flowFile)); |
| assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getObjectCount()); |
| assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getByteCount()); |
| } |
| |
| assertEquals(0, swapManager.swapInCalledCount); |
| assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount()); |
| assertNotNull(queue.poll(exp)); |
| |
| assertEquals(0, swapManager.swapInCalledCount); |
| assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount()); |
| |
| assertEquals(1, swapManager.swapOutCalledCount); |
| |
| assertNotNull(queue.poll(exp)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top. |
| assertEquals(1, swapManager.swapInCalledCount); |
| assertEquals(9999, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount()); |
| |
| assertTrue(swapManager.swappedOut.isEmpty()); |
| |
| queue.poll(exp); |
| } |
| |
| @Test |
| public void testQueueCountsUpdatedWhenIncompleteSwapFile() { |
| for (int i = 1; i <= 20000; i++) { |
| queue.put(new MockFlowFileRecord()); |
| } |
| |
| assertEquals(20000, queue.size().getObjectCount()); |
| assertEquals(20000, queue.size().getByteCount()); |
| |
| assertEquals(1, swapManager.swappedOut.size()); |
| |
| // when we swap in, cause an IncompleteSwapFileException to be |
| // thrown and contain only 9,999 of the 10,000 FlowFiles |
| swapManager.enableIncompleteSwapFileException(9999); |
| final Set<FlowFileRecord> expired = Collections.emptySet(); |
| FlowFileRecord flowFile; |
| |
| for (int i = 0; i < 10000; i++) { |
| flowFile = queue.poll(expired); |
| assertNotNull(flowFile); |
| queue.acknowledge(Collections.singleton(flowFile)); |
| } |
| |
| // 10,000 FlowFiles on queue - all swapped out |
| assertEquals(10000, queue.size().getObjectCount()); |
| assertEquals(10000, queue.size().getByteCount()); |
| assertEquals(1, swapManager.swappedOut.size()); |
| assertEquals(0, swapManager.swapInCalledCount); |
| |
| // Trigger swap in. This will remove 1 FlowFile from queue, leaving 9,999 but |
| // on swap in, we will get only 9,999 FlowFiles put onto the queue, and the queue size will |
| // be decremented by 10,000 (because the Swap File's header tells us that there are 10K |
| // FlowFiles, even though only 9999 are in the swap file) |
| flowFile = queue.poll(expired); |
| assertNotNull(flowFile); |
| queue.acknowledge(Collections.singleton(flowFile)); |
| |
| // size should be 9,998 because we lost 1 on Swap In, and then we pulled one above. |
| assertEquals(9998, queue.size().getObjectCount()); |
| assertEquals(9998, queue.size().getByteCount()); |
| assertEquals(0, swapManager.swappedOut.size()); |
| assertEquals(1, swapManager.swapInCalledCount); |
| |
| for (int i = 0; i < 9998; i++) { |
| flowFile = queue.poll(expired); |
| assertNotNull("Null FlowFile when i = " + i, flowFile); |
| queue.acknowledge(Collections.singleton(flowFile)); |
| |
| final QueueSize queueSize = queue.size(); |
| assertEquals(9998 - i - 1, queueSize.getObjectCount()); |
| assertEquals(9998 - i - 1, queueSize.getByteCount()); |
| } |
| |
| final QueueSize queueSize = queue.size(); |
| assertEquals(0, queueSize.getObjectCount()); |
| assertEquals(0L, queueSize.getByteCount()); |
| |
| flowFile = queue.poll(expired); |
| assertNull(flowFile); |
| } |
| |
| @Test(timeout = 120000) |
| public void testDropSwappedFlowFiles() { |
| for (int i = 1; i <= 30000; i++) { |
| queue.put(new MockFlowFileRecord()); |
| } |
| |
| assertEquals(2, swapManager.swappedOut.size()); |
| final DropFlowFileStatus status = queue.dropFlowFiles("1", "Unit Test"); |
| while (status.getState() != DropFlowFileState.COMPLETE) { |
| try { |
| Thread.sleep(100L); |
| } catch (final Exception e) { |
| } |
| } |
| |
| assertEquals(0, queue.size().getObjectCount()); |
| assertEquals(0, queue.size().getByteCount()); |
| assertEquals(0, swapManager.swappedOut.size()); |
| assertEquals(2, swapManager.swapInCalledCount); |
| } |
| |
| |
| @Test(timeout = 5000) |
| public void testListFlowFilesOnlyActiveQueue() throws InterruptedException { |
| for (int i = 0; i < 9999; i++) { |
| queue.put(new MockFlowFileRecord()); |
| } |
| |
| final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 10000); |
| assertNotNull(status); |
| assertEquals(9999, status.getQueueSize().getObjectCount()); |
| |
| while (status.getState() != ListFlowFileState.COMPLETE) { |
| Thread.sleep(100); |
| } |
| |
| assertEquals(9999, status.getFlowFileSummaries().size()); |
| assertEquals(100, status.getCompletionPercentage()); |
| assertNull(status.getFailureReason()); |
| } |
| |
| |
| @Test(timeout = 5000) |
| public void testListFlowFilesResultsLimited() throws InterruptedException { |
| for (int i = 0; i < 30050; i++) { |
| queue.put(new MockFlowFileRecord()); |
| } |
| |
| final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 100); |
| assertNotNull(status); |
| assertEquals(30050, status.getQueueSize().getObjectCount()); |
| |
| while (status.getState() != ListFlowFileState.COMPLETE) { |
| Thread.sleep(100); |
| } |
| |
| assertEquals(100, status.getFlowFileSummaries().size()); |
| assertEquals(100, status.getCompletionPercentage()); |
| assertNull(status.getFailureReason()); |
| } |
| |
| @Test(timeout = 5000) |
| public void testListFlowFilesResultsLimitedCollection() throws InterruptedException { |
| Collection<FlowFileRecord> tff = new ArrayList<>(); |
| //Swap Size is 10000 records, so 30000 is equal to 3 swap files. |
| for (int i = 0; i < 30000; i++) { |
| tff.add(new MockFlowFileRecord()); |
| } |
| |
| queue.putAll(tff); |
| |
| final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 100); |
| assertNotNull(status); |
| assertEquals(30000, status.getQueueSize().getObjectCount()); |
| |
| while (status.getState() != ListFlowFileState.COMPLETE) { |
| Thread.sleep(100); |
| } |
| |
| assertEquals(100, status.getFlowFileSummaries().size()); |
| assertEquals(100, status.getCompletionPercentage()); |
| assertNull(status.getFailureReason()); |
| } |
| |
| |
| @Test |
| public void testOOMEFollowedBySuccessfulSwapIn() { |
| final List<FlowFileRecord> flowFiles = new ArrayList<>(); |
| for (int i = 0; i < 50000; i++) { |
| flowFiles.add(new MockFlowFileRecord()); |
| } |
| |
| queue.putAll(flowFiles); |
| |
| swapManager.failSwapInAfterN = 2; |
| swapManager.setSwapInFailure(new OutOfMemoryError("Intentional OOME for unit test")); |
| |
| final Set<FlowFileRecord> expiredRecords = new HashSet<>(); |
| for (int i = 0; i < 30000; i++) { |
| final FlowFileRecord polled = queue.poll(expiredRecords); |
| assertNotNull(polled); |
| } |
| |
| // verify that unexpected ERROR's are handled in such a way that we keep retrying |
| for (int i = 0; i < 3; i++) { |
| try { |
| queue.poll(expiredRecords); |
| Assert.fail("Expected OOME to be thrown"); |
| } catch (final OutOfMemoryError oome) { |
| // expected |
| } |
| } |
| |
| // verify that unexpected Runtime Exceptions are handled in such a way that we keep retrying |
| swapManager.setSwapInFailure(new NullPointerException("Intentional OOME for unit test")); |
| |
| for (int i = 0; i < 3; i++) { |
| try { |
| queue.poll(expiredRecords); |
| Assert.fail("Expected NPE to be thrown"); |
| } catch (final NullPointerException npe) { |
| // expected |
| } |
| } |
| |
| swapManager.failSwapInAfterN = -1; |
| |
| for (int i = 0; i < 20000; i++) { |
| final FlowFileRecord polled = queue.poll(expiredRecords); |
| assertNotNull(polled); |
| } |
| |
| queue.acknowledge(flowFiles); |
| assertNull(queue.poll(expiredRecords)); |
| assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount()); |
| assertEquals(0, queue.size().getObjectCount()); |
| |
| assertTrue(swapManager.swappedOut.isEmpty()); |
| } |
| |
| @Test |
| public void testGetTotalActiveQueuedDuration() { |
| long now = System.currentTimeMillis(); |
| MockFlowFileRecord testFlowfile1 = new MockFlowFileRecord(); |
| testFlowfile1.setLastQueuedDate(now - 500); |
| MockFlowFileRecord testFlowfile2 = new MockFlowFileRecord(); |
| testFlowfile2.setLastQueuedDate(now - 1000); |
| |
| queue.put(testFlowfile1); |
| queue.put(testFlowfile2); |
| |
| assertEquals(1500, queue.getTotalQueuedDuration(now)); |
| queue.poll(1, Collections.emptySet()); |
| |
| assertEquals(1000, queue.getTotalQueuedDuration(now)); |
| } |
| |
| @Test |
| public void testGetMinLastQueueDate() { |
| long now = System.currentTimeMillis(); |
| MockFlowFileRecord testFlowfile1 = new MockFlowFileRecord(); |
| testFlowfile1.setLastQueuedDate(now - 1000); |
| MockFlowFileRecord testFlowfile2 = new MockFlowFileRecord(); |
| testFlowfile2.setLastQueuedDate(now - 500); |
| |
| queue.put(testFlowfile1); |
| queue.put(testFlowfile2); |
| |
| assertEquals(1000, now - queue.getMinLastQueueDate()); |
| queue.poll(1, Collections.emptySet()); |
| |
| assertEquals(500, now - queue.getMinLastQueueDate()); |
| } |
| |
| |
| private static class FlowFileSizePrioritizer implements FlowFilePrioritizer { |
| @Override |
| public int compare(final FlowFile o1, final FlowFile o2) { |
| return Long.compare(o1.getSize(), o2.getSize()); |
| } |
| } |
| } |