blob: 90f1f3df3a30771a0f376227f7e2b2922a4a8f9c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.integration.lifecycle;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.repository.FileSystemRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.WriteAheadFlowFileRepository;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.integration.FrameworkIntegrationTest;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.junit.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
public class ContentCleanupIT extends FrameworkIntegrationTest {
@Test
public void testCompletedContentClaimCleanedUpOnCheckpoint() throws InterruptedException, IOException, ExecutionException {
final AtomicReference<FlowFileRecord> largeFlowFileReference = new AtomicReference<>();
final AtomicReference<FlowFileRecord> smallFlowFileReference = new AtomicReference<>();
// Processor to write 1 MB of content to a FlowFile
final ProcessorNode createLargeProcessor = createGenerateProcessor(1024 * 1024, largeFlowFileReference);
final ProcessorNode createSmallProcessor = createGenerateProcessor(5, smallFlowFileReference);
connect(createLargeProcessor, getTerminateProcessor(), REL_SUCCESS);
connect(createSmallProcessor, getTerminateProcessor(), REL_SUCCESS);
// Trigger the create processor.
triggerOnce(createLargeProcessor);
triggerOnce(createSmallProcessor);
// Ensure content available and has a claim count of 1.
final ContentClaim largeContentClaim = largeFlowFileReference.get().getContentClaim();
final ContentClaim smallContentClaim = smallFlowFileReference.get().getContentClaim();
assertNotEquals(largeContentClaim.getResourceClaim(), smallContentClaim.getResourceClaim());
assertEquals(1, getContentRepository().getClaimantCount(largeContentClaim));
assertEquals(1, getContentRepository().getClaimantCount(smallContentClaim));
// Ensure that content is still available and considered 'in use'
final FileSystemRepository fileSystemRepository = (FileSystemRepository) getContentRepository();
final Path largeClaimPath = fileSystemRepository.getPath(largeContentClaim, false);
final Path smallClaimPath = fileSystemRepository.getPath(smallContentClaim, false);
assertTrue(Files.exists(largeClaimPath));
assertTrue(largeContentClaim.getResourceClaim().isInUse());
assertTrue(Files.exists(smallClaimPath));
assertTrue(smallContentClaim.getResourceClaim().isInUse());
int recordCount = ((WriteAheadFlowFileRepository) getFlowFileRepository()).checkpoint();
assertEquals(2, recordCount);
// Trigger the delete Processor.
triggerOnce(getTerminateProcessor());
triggerOnce(getTerminateProcessor());
// Claim count should now be 0 and resource claim should not be in use.
assertEquals(0, getContentRepository().getClaimantCount(largeContentClaim));
assertEquals(0, getContentRepository().getClaimantCount(largeContentClaim));
assertFalse(largeContentClaim.getResourceClaim().isInUse());
assertTrue(smallContentClaim.getResourceClaim().isInUse());
// Checkpoint the FlowFile Repo
recordCount = ((WriteAheadFlowFileRepository) getFlowFileRepository()).checkpoint();
assertEquals(0, recordCount);
// Wait for the data to be deleted/archived.
waitForClaimDestruction(largeContentClaim);
assertTrue(Files.exists(smallClaimPath));
assertProvenanceEventCount(ProvenanceEventType.CREATE, 2);
assertProvenanceEventCount(ProvenanceEventType.DROP, 2);
}
@Test
public void testTransientClaimsNotHeld() throws ExecutionException, InterruptedException, IOException {
final AtomicReference<ContentClaim> claimReference = new AtomicReference<>();
final ProcessorNode processor = createProcessorNode((context, session) -> {
FlowFile flowFile = session.create();
for (int i=0; i < 1000; i++) {
final byte[] bytes = String.valueOf(i).getBytes();
flowFile = session.write(flowFile, out -> out.write(bytes));
}
// Write 1 MB to the content claim in order to ensure that the claim is no longer usable.
final byte[] oneMB = new byte[1024 * 1024];
flowFile = session.write(flowFile, out -> out.write(oneMB));
claimReference.set(((FlowFileRecord) flowFile).getContentClaim());
session.transfer(flowFile, REL_SUCCESS);
}, REL_SUCCESS);
connect(processor, getTerminateProcessor(), REL_SUCCESS);
triggerOnce(processor);
final int claimCount = getContentRepository().getClaimantCount(claimReference.get());
assertEquals(1, claimCount);
int recordCount = ((WriteAheadFlowFileRepository) getFlowFileRepository()).checkpoint();
assertEquals(1, recordCount);
assertTrue(claimReference.get().getResourceClaim().isInUse());
triggerOnce(getTerminateProcessor());
assertFalse(claimReference.get().getResourceClaim().isInUse());
recordCount = ((WriteAheadFlowFileRepository) getFlowFileRepository()).checkpoint();
assertEquals(0, recordCount);
waitForClaimDestruction(claimReference.get());
assertProvenanceEventCount(ProvenanceEventType.CREATE, 1);
assertProvenanceEventCount(ProvenanceEventType.DROP, 1);
}
@Test
public void testCloneIncrementsContentClaim() throws ExecutionException, InterruptedException, IOException {
final AtomicReference<FlowFileRecord> flowFileReference = new AtomicReference<>();
final ProcessorNode createProcessor = createGenerateProcessor(1024 * 1024, flowFileReference);
connect(createProcessor, getTerminateProcessor(), REL_SUCCESS);
connect(createProcessor, getTerminateAllProcessor(), REL_SUCCESS);
triggerOnce(createProcessor);
final ContentClaim contentClaim = flowFileReference.get().getContentClaim();
assertEquals(2, getContentRepository().getClaimantCount(contentClaim));
assertTrue(contentClaim.getResourceClaim().isInUse());
triggerOnce(getTerminateProcessor());
assertEquals(1, getContentRepository().getClaimantCount(contentClaim));
assertTrue(contentClaim.getResourceClaim().isInUse());
triggerOnce(getTerminateAllProcessor());
assertEquals(0, getContentRepository().getClaimantCount(contentClaim));
assertFalse(contentClaim.getResourceClaim().isInUse());
assertEquals(0, ((WriteAheadFlowFileRepository) getFlowFileRepository()).checkpoint());
waitForClaimDestruction(contentClaim);
}
private void waitForClaimDestruction(final ContentClaim contentClaim) {
final Path path = ((FileSystemRepository) getContentRepository()).getPath(contentClaim, false);
while (Files.exists(path)) {
try {
Thread.sleep(10L);
} catch (final Exception e) {
}
}
}
}