/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.nifi.provenance.journaling.partition; | |
import java.io.File; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Set; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicLong; | |
import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; | |
import org.apache.nifi.provenance.journaling.index.IndexManager; | |
import org.apache.nifi.util.Tuple; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class QueuingPartitionManager implements PartitionManager { | |
private static final Logger logger = LoggerFactory.getLogger(QueuingPartitionManager.class); | |
private final IndexManager indexManager; | |
private final JournalingRepositoryConfig config; | |
private final BlockingQueue<Partition> partitionQueue; | |
private final JournalingPartition[] partitionArray; | |
private final AtomicLong eventIdGenerator; | |
private volatile boolean shutdown = false; | |
private final Set<Partition> blackListedPartitions = Collections.synchronizedSet(new HashSet<Partition>()); | |
public QueuingPartitionManager(final IndexManager indexManager, final AtomicLong eventIdGenerator, final JournalingRepositoryConfig config, final ScheduledExecutorService workerExecutor, final ExecutorService compressionExecutor) throws IOException { | |
this.indexManager = indexManager; | |
this.config = config; | |
this.eventIdGenerator = eventIdGenerator; | |
// We can consider using a PriorityQueue here instead. Keep track of how many Partitions are being written | |
// to for each container, as a container usually maps to a physical drive. Then, prioritize the queue | |
// so that the partitions that belong to Container A get a higher priority than those belonging to Container B | |
// if there are currently more partitions on Container B being written to (i.e., we prefer a partition for the | |
// container that is the least used at this moment). Would require significant performance testing to see if it | |
// really provides any benefit. | |
this.partitionQueue = new LinkedBlockingQueue<>(config.getPartitionCount()); | |
this.partitionArray = new JournalingPartition[config.getPartitionCount()]; | |
final List<Tuple<String, File>> containerTuples = new ArrayList<>(config.getContainers().size()); | |
for ( final Map.Entry<String, File> entry : config.getContainers().entrySet() ) { | |
containerTuples.add(new Tuple<>(entry.getKey(), entry.getValue())); | |
} | |
final Map<String, AtomicLong> containerSizes = new HashMap<>(); | |
for ( final String containerName : config.getContainers().keySet() ) { | |
containerSizes.put(containerName, new AtomicLong(0L)); | |
} | |
for (int i=0; i < config.getPartitionCount(); i++) { | |
final Tuple<String, File> tuple = containerTuples.get(i % containerTuples.size()); | |
final File section = new File(tuple.getValue(), String.valueOf(i)); | |
final String containerName = tuple.getKey(); | |
final JournalingPartition partition = new JournalingPartition(indexManager, containerName, i, | |
section, config, containerSizes.get(containerName), compressionExecutor); | |
partition.restore(); | |
partitionQueue.offer(partition); | |
partitionArray[i] = partition; | |
} | |
workerExecutor.scheduleWithFixedDelay(new CheckBlackListedPartitions(), 30, 30, TimeUnit.SECONDS); | |
} | |
@Override | |
public void shutdown() { | |
this.shutdown = true; | |
for ( final Partition partition : partitionArray ) { | |
partition.shutdown(); | |
} | |
} | |
Partition nextPartition(final boolean writeAction, final boolean waitIfNeeded) { | |
Partition partition = null; | |
final List<Partition> partitionsSkipped = new ArrayList<>(); | |
try { | |
while (partition == null) { | |
if (shutdown) { | |
throw new RuntimeException("Journaling Provenance Repository is shutting down"); | |
} | |
try { | |
partition = partitionQueue.poll(1, TimeUnit.SECONDS); | |
} catch (final InterruptedException ie) { | |
} | |
if ( partition == null ) { | |
if ( blackListedPartitions.size() >= config.getPartitionCount() ) { | |
throw new RuntimeException("Cannot persist to the Journal Provenance Repository because all partitions have been blacklisted due to write failures"); | |
} | |
// we are out of partitions. Add back all of the partitions that we skipped so we | |
// can try them again. | |
partitionQueue.addAll(partitionsSkipped); | |
partitionsSkipped.clear(); | |
} else if (writeAction) { | |
if ( waitIfNeeded ) { | |
// determine if the container is full. | |
final String containerName = partition.getContainerName(); | |
long desiredMaxContainerCapacity = config.getMaxCapacity(containerName); | |
// If no max capacity set for the container itself, use 1/N of repo max | |
// where N is the number of containers | |
if ( desiredMaxContainerCapacity == config.getMaxStorageCapacity() ) { | |
desiredMaxContainerCapacity = config.getMaxStorageCapacity() / config.getContainers().size(); | |
} | |
// if the partition is more than 10% over its desired capacity, we don't want to write to it. | |
if ( partition.getContainerSize() > 1.1 * desiredMaxContainerCapacity ) { | |
partitionsSkipped.add(partition); | |
continue; | |
} | |
} else { | |
return null; | |
} | |
} | |
} | |
} finally { | |
partitionQueue.addAll( partitionsSkipped ); | |
} | |
return partition; | |
} | |
private void blackList(final Partition partition) { | |
blackListedPartitions.add(partition); | |
} | |
@Override | |
public <T> T withPartition(final PartitionAction<T> action, final boolean writeAction) throws IOException { | |
final Partition partition = nextPartition(writeAction, true); | |
boolean ioe = false; | |
try { | |
return action.perform(partition); | |
} catch (final IOException e) { | |
ioe = true; | |
throw e; | |
} finally { | |
if ( ioe && writeAction ) { | |
blackList(partition); | |
} else { | |
partitionQueue.offer(partition); | |
} | |
} | |
} | |
@Override | |
public void withPartition(final VoidPartitionAction action, final boolean writeAction) throws IOException { | |
final Partition partition = nextPartition(writeAction, true); | |
boolean ioe = false; | |
try { | |
action.perform(partition); | |
} catch (final IOException e) { | |
ioe = true; | |
throw e; | |
} finally { | |
if ( ioe && writeAction ) { | |
blackList(partition); | |
} else { | |
partitionQueue.offer(partition); | |
} | |
} | |
} | |
@Override | |
public <T> Set<T> withEachPartitionSerially(final PartitionAction<T> action, final boolean writeAction) throws IOException { | |
if ( writeAction && blackListedPartitions.size() > 0 ) { | |
throw new IOException("Cannot perform action {} because at least one partition has been blacklisted (i.e., writint to the partition failed)"); | |
} | |
final Set<T> results = new HashSet<>(partitionArray.length); | |
for ( final Partition partition : partitionArray ) { | |
results.add( action.perform(partition) ); | |
} | |
return results; | |
} | |
@Override | |
public void withEachPartitionSerially(final VoidPartitionAction action, final boolean writeAction) throws IOException { | |
if ( writeAction && blackListedPartitions.size() > 0 ) { | |
throw new IOException("Cannot perform action {} because at least one partition has been blacklisted (i.e., writint to the partition failed)"); | |
} | |
for ( final Partition partition : partitionArray ) { | |
action.perform(partition); | |
} | |
} | |
private long getTotalSize() { | |
long totalSize = 0L; | |
for ( final JournalingPartition partition : partitionArray ) { | |
totalSize += partition.getPartitionSize(); | |
} | |
for ( final String containerName : config.getContainers().keySet() ) { | |
totalSize += indexManager.getSize(containerName); | |
} | |
return totalSize; | |
} | |
/** | |
* Responsible for looking at partitions that have been marked as blacklisted and checking if they | |
* are able to be written to now. If so, adds them back to the partition queue; otherwise, leaves | |
* them as blacklisted | |
*/ | |
private class CheckBlackListedPartitions implements Runnable { | |
@Override | |
public void run() { | |
final Set<Partition> reclaimed = new HashSet<>(); | |
final Set<Partition> partitions = new HashSet<>(blackListedPartitions); | |
for ( final Partition partition : partitions ) { | |
final long nextId = eventIdGenerator.get(); | |
if ( nextId <= 0 ) { | |
// we don't have an ID to use yet. Don't attempt to do anything yet. | |
return; | |
} | |
try { | |
partition.verifyWritable(nextId); | |
reclaimed.add(partition); | |
} catch (final IOException ioe) { | |
logger.debug("{} is still blackListed due to {}", partition, ioe); | |
} | |
} | |
// any partition that is reclaimable is now removed from the set of blacklisted | |
// partitions and added back to our queue of partitions | |
blackListedPartitions.removeAll(reclaimed); | |
partitionQueue.addAll(reclaimed); | |
} | |
} | |
@Override | |
public void deleteEventsBasedOnSize() { | |
final Map<String, List<JournalingPartition>> containerPartitionMap = new HashMap<>(); | |
for ( final JournalingPartition partition : partitionArray ) { | |
final String container = partition.getContainerName(); | |
List<JournalingPartition> list = containerPartitionMap.get(container); | |
if ( list == null ) { | |
list = new ArrayList<>(); | |
containerPartitionMap.put(container, list); | |
} | |
list.add(partition); | |
} | |
int iterations = 0; | |
for ( final String containerName : config.getContainers().keySet() ) { | |
// continue as long as we need to delete data from this container. | |
while (true) { | |
// don't hammer the disks if we can't delete anything | |
if ( iterations++ > 0 ) { | |
try { | |
Thread.sleep(1000L); | |
} catch (final InterruptedException ie) {} | |
} | |
final List<JournalingPartition> containerPartitions = containerPartitionMap.get(containerName); | |
final long containerSize = containerPartitions.get(0).getContainerSize(); | |
final long maxContainerCapacity = config.getMaxCapacity(containerName); | |
if ( containerSize < maxContainerCapacity ) { | |
break; | |
} | |
logger.debug("Container {} exceeds max capacity of {} bytes with a size of {} bytes; deleting oldest events", containerName, maxContainerCapacity, containerSize); | |
// container is too large. Delete oldest journal from each partition in this container. | |
for ( final Partition partition : containerPartitions ) { | |
try { | |
partition.deleteOldest(); | |
} catch (final IOException ioe) { | |
logger.error("Failed to delete events from {} due to {}", partition, ioe.toString()); | |
if ( logger.isDebugEnabled() ) { | |
logger.error("", ioe); | |
} | |
} | |
} | |
} | |
} | |
long totalSize; | |
iterations = 0; | |
while ((totalSize = getTotalSize()) >= config.getMaxStorageCapacity()) { | |
logger.debug("Provenance Repository exceeds max capacity of {} bytes with a size of {}; deleting oldest events", config.getMaxStorageCapacity(), totalSize); | |
// don't hammer the disks if we can't delete anything | |
if ( iterations++ > 0 ) { | |
try { | |
Thread.sleep(1000L); | |
} catch (final InterruptedException ie) {} | |
} | |
for ( final Partition partition : partitionArray ) { | |
try { | |
partition.deleteOldest(); | |
} catch (final IOException ioe) { | |
logger.error("Failed to delete events from {} due to {}", partition, ioe.toString()); | |
if ( logger.isDebugEnabled() ) { | |
logger.error("", ioe); | |
} | |
} | |
} | |
// don't hammer the disks if we can't delete anything | |
try { | |
Thread.sleep(1000L); | |
} catch (final InterruptedException ie) {} | |
} | |
} | |
} |