blob: 90a0dfbb4a454d91587f08c850b337f9218de409 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.IncompleteSwapFileException;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.swap.StandardSwapContents;
import org.apache.nifi.controller.swap.StandardSwapSummary;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
public class MockSwapManager implements FlowFileSwapManager {
public final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();
public int swapOutCalledCount = 0;
public int swapInCalledCount = 0;
public int incompleteSwapFileRecordsToInclude = -1;
public int failSwapInAfterN = -1;
public Throwable failSwapInFailure = null;
private int failSwapOutAfterN = -1;
private IOException failSwapOutFailure = null;
public void setSwapInFailure(final Throwable t) {
this.failSwapInFailure = t;
}
public void setSwapOutFailureOnNthIteration(final int n) {
setSwapOutFailureOnNthIteration(n, null);
}
public void setSwapOutFailureOnNthIteration(final int n, final IOException failureException) {
this.failSwapOutAfterN = n;
this.failSwapOutFailure = failureException;
}
@Override
public void initialize(final SwapManagerInitializationContext initializationContext) {
}
public void enableIncompleteSwapFileException(final int flowFilesToInclude) {
incompleteSwapFileRecordsToInclude = flowFilesToInclude;
}
@Override
public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue, final String partitionName) throws IOException {
swapOutCalledCount++;
if (failSwapOutAfterN > -1 && swapOutCalledCount >= failSwapOutAfterN) {
final IOException ioe = failSwapOutFailure == null ? new IOException("Intentional Unit Test IOException on swap out call number " + swapOutCalledCount) : failSwapOutFailure;
throw ioe;
}
final String location = UUID.randomUUID().toString() + "." + partitionName;
swappedOut.put(location, new ArrayList<>(flowFiles));
return location;
}
private void throwIncompleteIfNecessary(final String swapLocation, final boolean remove) throws IOException {
if (incompleteSwapFileRecordsToInclude > -1) {
final SwapSummary summary = getSwapSummary(swapLocation);
final List<FlowFileRecord> records;
if (remove) {
records = swappedOut.remove(swapLocation);
} else {
records = swappedOut.get(swapLocation);
}
final List<FlowFileRecord> partial = records.subList(0, incompleteSwapFileRecordsToInclude);
final SwapContents partialContents = new StandardSwapContents(summary, partial);
throw new IncompleteSwapFileException(swapLocation, partialContents);
}
if (swapInCalledCount > failSwapInAfterN && failSwapInAfterN > -1) {
if (failSwapInFailure instanceof RuntimeException) {
throw (RuntimeException) failSwapInFailure;
}
if (failSwapInFailure instanceof Error) {
throw (Error) failSwapInFailure;
}
throw new RuntimeException(failSwapInFailure);
}
}
@Override
public SwapContents peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException {
throwIncompleteIfNecessary(swapLocation, false);
return new StandardSwapContents(getSwapSummary(swapLocation), swappedOut.get(swapLocation));
}
@Override
public SwapContents swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
swapInCalledCount++;
throwIncompleteIfNecessary(swapLocation, true);
return new StandardSwapContents(getSwapSummary(swapLocation), swappedOut.remove(swapLocation));
}
@Override
public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue, final String partitionName) throws IOException {
return swappedOut.keySet().stream()
.filter(key -> key.endsWith("." + partitionName))
.collect(Collectors.toList());
}
@Override
public SwapSummary getSwapSummary(String swapLocation) throws IOException {
final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
if (flowFiles == null) {
return StandardSwapSummary.EMPTY_SUMMARY;
}
int count = 0;
long size = 0L;
Long max = null;
Long minLastQueueDate = null;
Long totalLastQueueDate = 0L;
final List<ResourceClaim> resourceClaims = new ArrayList<>();
for (final FlowFileRecord flowFile : flowFiles) {
count++;
size += flowFile.getSize();
if (max == null || flowFile.getId() > max) {
max = flowFile.getId();
}
minLastQueueDate = minLastQueueDate == null ? flowFile.getLastQueueDate() : Long.min(minLastQueueDate, flowFile.getLastQueueDate());
totalLastQueueDate += flowFile.getLastQueueDate();
if (flowFile.getContentClaim() != null) {
resourceClaims.add(flowFile.getContentClaim().getResourceClaim());
}
}
return new StandardSwapSummary(new QueueSize(count, size), max, resourceClaims, minLastQueueDate, totalLastQueueDate);
}
@Override
public void purge() {
swappedOut.clear();
}
@Override
public String getQueueIdentifier(final String swapLocation) {
return null;
}
@Override
public Set<String> getSwappedPartitionNames(final FlowFileQueue queue) throws IOException {
return swappedOut.keySet().stream()
.filter(key -> key.contains("."))
.map(key -> key.substring(key.indexOf(".") + 1))
.collect(Collectors.toCollection(HashSet::new));
}
@Override
public String changePartitionName(final String swapLocation, final String newPartitionName) throws IOException {
final List<FlowFileRecord> flowFiles = swappedOut.remove(swapLocation);
if (flowFiles == null) {
throw new IOException("Could not find swapfile with name " + swapLocation);
}
final String newSwapLocation;
final int dotIndex = swapLocation.indexOf(".");
if (dotIndex < 0) {
newSwapLocation = swapLocation + "." + newPartitionName;
} else {
newSwapLocation = swapLocation.substring(0, dotIndex) + "." + newPartitionName;
}
swappedOut.put(newSwapLocation, flowFiles);
return newSwapLocation;
}
}