blob: 48de1bf00722259979b6091c13567d9adf3c21d2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.stream.io.StreamUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
public class TestFileSystemSwapManager {
@Test
public void testBackwardCompatible() throws IOException {
try (final InputStream fis = new FileInputStream(new File("src/test/resources/old-swap-file.swap"));
final DataInputStream in = new DataInputStream(new BufferedInputStream(fis))) {
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
final FileSystemSwapManager swapManager = createSwapManager();
final SwapContents swapContents = swapManager.peek("src/test/resources/old-swap-file.swap", flowFileQueue);
final List<FlowFileRecord> records = swapContents.getFlowFiles();
assertEquals(10000, records.size());
for (final FlowFileRecord record : records) {
assertEquals(4, record.getAttributes().size());
assertEquals("value", record.getAttribute("key"));
}
}
}
@Test
public void testFailureOnRepoSwapOut() throws IOException {
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
Mockito.doThrow(new IOException("Intentional IOException for unit test"))
.when(flowFileRepo).swapFlowFilesOut(any(), any(), any());
final FileSystemSwapManager swapManager = createSwapManager(flowFileRepo);
final List<FlowFileRecord> flowFileRecords = new ArrayList<>();
for (int i=0; i < 10000; i++) {
flowFileRecords.add(new MockFlowFileRecord(i));
}
try {
swapManager.swapOut(flowFileRecords, flowFileQueue, "partition-1");
Assert.fail("Expected IOException");
} catch (final IOException ioe) {
// expected
}
}
@Test
public void testSwapFileUnknownToRepoNotSwappedIn() throws IOException {
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
when(flowFileQueue.getIdentifier()).thenReturn("");
final File targetDir = new File("target/swap");
targetDir.mkdirs();
final File targetFile = new File(targetDir, "444-old-swap-file.swap");
final File originalSwapFile = new File("src/test/resources/swap/444-old-swap-file.swap");
try (final OutputStream fos = new FileOutputStream(targetFile);
final InputStream fis = new FileInputStream(originalSwapFile)) {
StreamUtils.copy(fis, fos);
}
final FileSystemSwapManager swapManager = new FileSystemSwapManager(Paths.get("target"));
final ResourceClaimManager resourceClaimManager = new NopResourceClaimManager();
final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
swapManager.initialize(new SwapManagerInitializationContext() {
@Override
public ResourceClaimManager getResourceClaimManager() {
return resourceClaimManager;
}
@Override
public FlowFileRepository getFlowFileRepository() {
return flowFileRepo;
}
@Override
public EventReporter getEventReporter() {
return EventReporter.NO_OP;
}
});
when(flowFileRepo.isValidSwapLocationSuffix(anyString())).thenReturn(false);
final List<String> recoveredLocations = swapManager.recoverSwapLocations(flowFileQueue, null);
assertEquals(1, recoveredLocations.size());
final String firstLocation = recoveredLocations.get(0);
final SwapContents emptyContents = swapManager.swapIn(firstLocation, flowFileQueue);
assertEquals(0, emptyContents.getFlowFiles().size());
when(flowFileRepo.isValidSwapLocationSuffix(anyString())).thenReturn(true);
when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
final SwapContents contents = swapManager.swapIn(firstLocation, flowFileQueue);
assertEquals(10000, contents.getFlowFiles().size());
}
private FileSystemSwapManager createSwapManager() throws IOException {
final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
return createSwapManager(flowFileRepo);
}
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private FileSystemSwapManager createSwapManager(final FlowFileRepository flowFileRepo) throws IOException {
final FileSystemSwapManager swapManager = new FileSystemSwapManager(temporaryFolder.newFolder().toPath());
final ResourceClaimManager resourceClaimManager = new NopResourceClaimManager();
swapManager.initialize(new SwapManagerInitializationContext() {
@Override
public ResourceClaimManager getResourceClaimManager() {
return resourceClaimManager;
}
@Override
public FlowFileRepository getFlowFileRepository() {
return flowFileRepo;
}
@Override
public EventReporter getEventReporter() {
return EventReporter.NO_OP;
}
});
return swapManager;
}
public class NopResourceClaimManager implements ResourceClaimManager {
@Override
public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant, boolean writable) {
return null;
}
@Override
public ResourceClaim getResourceClaim(String container, String section, String id) {
return null;
}
@Override
public int getClaimantCount(ResourceClaim claim) {
return 0;
}
@Override
public int decrementClaimantCount(ResourceClaim claim) {
return 0;
}
@Override
public int incrementClaimantCount(ResourceClaim claim) {
return 0;
}
@Override
public int incrementClaimantCount(ResourceClaim claim, boolean newClaim) {
return 0;
}
@Override
public void markDestructable(ResourceClaim claim) {
}
@Override
public void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements) {
}
@Override
public void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements, long timeout, TimeUnit unit) {
}
@Override
public void purge() {
}
@Override
public void freeze(ResourceClaim claim) {
}
@Override
public boolean isDestructable(final ResourceClaim claim) {
return false;
}
}
}