blob: 79776208592a6caff1ce6b71e65f7a510f53bf92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.journaling.tasks;
import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.journaling.io.StandardEventSerializer;
import org.apache.nifi.provenance.journaling.journals.DeflatorCompressionCodec;
import org.apache.nifi.provenance.journaling.journals.JournalReader;
import org.apache.nifi.provenance.journaling.journals.JournalWriter;
import org.apache.nifi.provenance.journaling.journals.StandardJournalReader;
import org.apache.nifi.provenance.journaling.journals.StandardJournalWriter;
import org.apache.nifi.provenance.journaling.toc.StandardTocReader;
import org.apache.nifi.provenance.journaling.toc.StandardTocWriter;
import org.apache.nifi.provenance.journaling.toc.TocReader;
import org.apache.nifi.provenance.journaling.toc.TocWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Compresses a journal file and returns the new size of the journal
*/
public class CompressionTask implements Callable<Long> {
public static final String FILE_EXTENSION = ".compress";
private static final Logger logger = LoggerFactory.getLogger(CompressionTask.class);
private final File journalFile;
private final long journalId;
private final File tocFile;
public CompressionTask(final File journalFile, final long journalId, final File tocFile) {
this.journalFile = journalFile;
this.journalId = journalId;
this.tocFile = tocFile;
}
public void compress(final JournalReader reader, final JournalWriter writer, final TocReader tocReader, final TocWriter tocWriter) throws IOException {
ProvenanceEventRecord event;
int blockIndex = 0;
long blockOffset = tocReader.getBlockOffset(blockIndex);
tocWriter.addBlockOffset(blockOffset);
long nextBlockOffset = tocReader.getBlockOffset(blockIndex + 1);
// we write the events one at a time here so that we can ensure that when the block
// changes we are able to insert a new block into the TOC, as the blocks have to contain
// the same number of events, since the index just knows about the block index.
try {
while ((event = reader.nextEvent()) != null) {
// Check if we've gone beyond the offset of the next block. If so, write
// out a new block in the TOC.
final long newPosition = reader.getPosition();
if ( newPosition > nextBlockOffset && nextBlockOffset > 0 ) {
blockIndex++;
blockOffset = tocReader.getBlockOffset(blockIndex);
tocWriter.addBlockOffset(writer.getSize());
nextBlockOffset = tocReader.getBlockOffset(blockIndex + 1);
}
// Write the event to the compressed writer
writer.write(Collections.singleton(event), event.getEventId());
}
} catch (final EOFException eof) {
logger.warn("Found unexpected End-of-File when compressing {}", reader);
}
}
/**
* Attempts to delete the given file up to 10 times, waiting a bit in between each failed
* iteration, in case another process (for example, a virus scanner) has the file locked
*
* @param file
* @return
*/
private boolean delete(final File file) {
for (int i=0; i < 10; i++) {
if ( file.delete() || !file.exists() ) {
return true;
}
try {
Thread.sleep(100L);
} catch (final InterruptedException ie) {
}
}
return !file.exists();
}
/**
* Attempts to rename the given original file to the renamed file up to 20 times, waiting a bit
* in between each failed iteration, in case another process (for example, a virus scanner) has
* the file locked
*
* @param original
* @param renamed
* @return
*/
public static boolean rename(final File original, final File renamed) {
for (int i=0; i < 20; i++) {
if ( original.renameTo(renamed) ) {
return true;
}
try {
Thread.sleep(100L);
} catch (final InterruptedException ie) {
}
}
return false;
}
@Override
public Long call() {
final long startNanos = System.nanoTime();
final long preCompressionSize = journalFile.length();
try {
final File compressedFile = new File(journalFile.getParentFile(), journalFile.getName() + FILE_EXTENSION);
final File compressedTocFile = new File(tocFile.getParentFile(), tocFile.getName() + FILE_EXTENSION);
if ( compressedFile.exists() && !compressedFile.delete() ) {
logger.error("Compressed file {} already exists and could not remove it; compression task failed", compressedFile);
return preCompressionSize;
}
if ( compressedTocFile.exists() && !compressedTocFile.delete() ) {
logger.error("Compressed TOC file {} already exists and could not remove it; compression task failed", compressedTocFile);
return preCompressionSize;
}
try (final JournalReader journalReader = new StandardJournalReader(journalFile);
final JournalWriter compressedWriter = new StandardJournalWriter(journalId, compressedFile, new DeflatorCompressionCodec(), new StandardEventSerializer());
final TocReader tocReader = new StandardTocReader(tocFile);
final TocWriter compressedTocWriter = new StandardTocWriter(compressedTocFile, true, false)) {
compress(journalReader, compressedWriter, tocReader, compressedTocWriter);
compressedWriter.sync();
} catch (final FileNotFoundException fnfe) {
logger.info("Failed to compress Journal File {} because it has already been removed", journalFile);
return 0L;
}
final long postCompressionSize = compressedFile.length();
final boolean deletedJournal = delete(journalFile);
if ( !deletedJournal ) {
delete(compressedFile);
delete(compressedTocFile);
logger.error("Failed to remove Journal file {}; considering compression task a failure", journalFile);
return preCompressionSize;
}
final boolean deletedToc = delete(tocFile);
if ( !deletedToc ) {
delete(compressedFile);
delete(compressedTocFile);
logger.error("Failed to remove TOC file for {}; considering compression task a failure", journalFile);
return preCompressionSize;
}
final boolean renamedJournal = rename(compressedFile, journalFile);
if ( !renamedJournal ) {
logger.error("Failed to rename {} to {}; this journal file may be inaccessible until it is renamed", compressedFile, journalFile);
}
final boolean renamedToc = rename(compressedTocFile, tocFile);
if ( !renamedToc ) {
logger.error("Failed to rename {} to {}; this journal file may be inaccessible until it is renamed", compressedTocFile, tocFile);
}
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
final double percent = (postCompressionSize / preCompressionSize) * 100D;
final String pct = String.format("%.2f", percent);
logger.info("Successfully compressed Journal File {} in {} millis; size changed from {} bytes to {} bytes ({}% of original size)", journalFile, millis, preCompressionSize, postCompressionSize, pct);
return postCompressionSize;
} catch (final IOException ioe) {
logger.error("Failed to compress Journal File {} due to {}", journalFile, ioe.toString());
if ( logger.isDebugEnabled() ) {
logger.error("", ioe);
}
return preCompressionSize;
}
}
}