blob: 6ef51151e6ee19f974539baf216fbc740b55d7a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.security.kms.EncryptionException;
import org.apache.nifi.security.kms.StaticKeyProvider;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.logging.Logger;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
/**
* Test cases for {@link EncryptedFileSystemSwapManager}.
*/
public class TestEncryptedFileSystemSwapManager {
private static final Logger logger = Logger.getLogger(TestEncryptedFileSystemSwapManager.class.getName());
/**
* Test a simple swap to disk / swap from disk operation. Configured to use {@link StaticKeyProvider}.
*/
@Test
public void testSwapOutSwapIn() throws GeneralSecurityException, EncryptionException, IOException {
// use temp folder on filesystem to temporarily hold swap content (clean up after test)
final File folderRepository = Files.createTempDirectory(getClass().getSimpleName()).toFile();
logger.info(folderRepository.getPath());
folderRepository.deleteOnExit();
new File(folderRepository, "swap").deleteOnExit();
// configure a nifi properties for encrypted swap file
final Properties properties = new Properties();
properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS, StaticKeyProvider.class.getName());
properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID, NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY);
properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY, StringUtils.repeat("00", 32));
properties.put(NiFiProperties.FLOWFILE_REPOSITORY_DIRECTORY, folderRepository.getPath());
final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(null, properties);
// generate some flow file content to swap to disk
final List<FlowFileRecord> flowFiles = new ArrayList<>();
for (int i = 0; (i < 100); ++i) {
flowFiles.add(new StandardFlowFileRecord.Builder().id(i).build());
}
// setup for test case
final FlowFileSwapManager swapManager = createSwapManager(nifiProperties);
final String queueIdentifier = UUID.randomUUID().toString();
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
when(flowFileQueue.getIdentifier()).thenReturn(queueIdentifier);
// swap out to disk; pull content back from disk
final String swapPath = swapManager.swapOut(flowFiles, flowFileQueue, "partition-1");
final SwapContents swapContents = swapManager.swapIn(swapPath, flowFileQueue);
// verify recovery of original content
final List<FlowFileRecord> flowFilesRecovered = swapContents.getFlowFiles();
Assert.assertEquals(flowFiles.size(), flowFilesRecovered.size());
Assert.assertTrue(flowFilesRecovered.containsAll(flowFiles));
Assert.assertTrue(flowFiles.containsAll(flowFilesRecovered));
}
/**
* Borrowed from "nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java".
*/
private FlowFileSwapManager createSwapManager(NiFiProperties nifiProperties)
throws IOException, GeneralSecurityException, EncryptionException {
final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
when(flowFileRepo.isValidSwapLocationSuffix(any())).thenReturn(true);
final FileSystemSwapManager swapManager = new EncryptedFileSystemSwapManager(nifiProperties);
final ResourceClaimManager resourceClaimManager = Mockito.mock(ResourceClaimManager.class);
final SwapManagerInitializationContext context = Mockito.mock(SwapManagerInitializationContext.class);
when(context.getResourceClaimManager()).thenReturn(resourceClaimManager);
when(context.getFlowFileRepository()).thenReturn(flowFileRepo);
when(context.getEventReporter()).thenReturn(EventReporter.NO_OP);
swapManager.initialize(context);
return swapManager;
}
}