/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.nifi.provenance.journaling.tasks; | |
import java.io.EOFException; | |
import java.io.File; | |
import java.io.FileNotFoundException; | |
import java.io.IOException; | |
import java.util.Collections; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.TimeUnit; | |
import org.apache.nifi.provenance.ProvenanceEventRecord; | |
import org.apache.nifi.provenance.journaling.io.StandardEventSerializer; | |
import org.apache.nifi.provenance.journaling.journals.DeflatorCompressionCodec; | |
import org.apache.nifi.provenance.journaling.journals.JournalReader; | |
import org.apache.nifi.provenance.journaling.journals.JournalWriter; | |
import org.apache.nifi.provenance.journaling.journals.StandardJournalReader; | |
import org.apache.nifi.provenance.journaling.journals.StandardJournalWriter; | |
import org.apache.nifi.provenance.journaling.toc.StandardTocReader; | |
import org.apache.nifi.provenance.journaling.toc.StandardTocWriter; | |
import org.apache.nifi.provenance.journaling.toc.TocReader; | |
import org.apache.nifi.provenance.journaling.toc.TocWriter; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* Compresses a journal file and returns the new size of the journal | |
*/ | |
public class CompressionTask implements Callable<Long> { | |
public static final String FILE_EXTENSION = ".compress"; | |
private static final Logger logger = LoggerFactory.getLogger(CompressionTask.class); | |
private final File journalFile; | |
private final long journalId; | |
private final File tocFile; | |
public CompressionTask(final File journalFile, final long journalId, final File tocFile) { | |
this.journalFile = journalFile; | |
this.journalId = journalId; | |
this.tocFile = tocFile; | |
} | |
public void compress(final JournalReader reader, final JournalWriter writer, final TocReader tocReader, final TocWriter tocWriter) throws IOException { | |
ProvenanceEventRecord event; | |
int blockIndex = 0; | |
long blockOffset = tocReader.getBlockOffset(blockIndex); | |
tocWriter.addBlockOffset(blockOffset); | |
long nextBlockOffset = tocReader.getBlockOffset(blockIndex + 1); | |
// we write the events one at a time here so that we can ensure that when the block | |
// changes we are able to insert a new block into the TOC, as the blocks have to contain | |
// the same number of events, since the index just knows about the block index. | |
try { | |
while ((event = reader.nextEvent()) != null) { | |
// Check if we've gone beyond the offset of the next block. If so, write | |
// out a new block in the TOC. | |
final long newPosition = reader.getPosition(); | |
if ( newPosition > nextBlockOffset && nextBlockOffset > 0 ) { | |
blockIndex++; | |
blockOffset = tocReader.getBlockOffset(blockIndex); | |
tocWriter.addBlockOffset(writer.getSize()); | |
nextBlockOffset = tocReader.getBlockOffset(blockIndex + 1); | |
} | |
// Write the event to the compressed writer | |
writer.write(Collections.singleton(event), event.getEventId()); | |
} | |
} catch (final EOFException eof) { | |
logger.warn("Found unexpected End-of-File when compressing {}", reader); | |
} | |
} | |
/** | |
* Attempts to delete the given file up to 10 times, waiting a bit in between each failed | |
* iteration, in case another process (for example, a virus scanner) has the file locked | |
* | |
* @param file | |
* @return | |
*/ | |
private boolean delete(final File file) { | |
for (int i=0; i < 10; i++) { | |
if ( file.delete() || !file.exists() ) { | |
return true; | |
} | |
try { | |
Thread.sleep(100L); | |
} catch (final InterruptedException ie) { | |
} | |
} | |
return !file.exists(); | |
} | |
/** | |
* Attempts to rename the given original file to the renamed file up to 20 times, waiting a bit | |
* in between each failed iteration, in case another process (for example, a virus scanner) has | |
* the file locked | |
* | |
* @param original | |
* @param renamed | |
* @return | |
*/ | |
public static boolean rename(final File original, final File renamed) { | |
for (int i=0; i < 20; i++) { | |
if ( original.renameTo(renamed) ) { | |
return true; | |
} | |
try { | |
Thread.sleep(100L); | |
} catch (final InterruptedException ie) { | |
} | |
} | |
return false; | |
} | |
@Override | |
public Long call() { | |
final long startNanos = System.nanoTime(); | |
final long preCompressionSize = journalFile.length(); | |
try { | |
final File compressedFile = new File(journalFile.getParentFile(), journalFile.getName() + FILE_EXTENSION); | |
final File compressedTocFile = new File(tocFile.getParentFile(), tocFile.getName() + FILE_EXTENSION); | |
if ( compressedFile.exists() && !compressedFile.delete() ) { | |
logger.error("Compressed file {} already exists and could not remove it; compression task failed", compressedFile); | |
return preCompressionSize; | |
} | |
if ( compressedTocFile.exists() && !compressedTocFile.delete() ) { | |
logger.error("Compressed TOC file {} already exists and could not remove it; compression task failed", compressedTocFile); | |
return preCompressionSize; | |
} | |
try (final JournalReader journalReader = new StandardJournalReader(journalFile); | |
final JournalWriter compressedWriter = new StandardJournalWriter(journalId, compressedFile, new DeflatorCompressionCodec(), new StandardEventSerializer()); | |
final TocReader tocReader = new StandardTocReader(tocFile); | |
final TocWriter compressedTocWriter = new StandardTocWriter(compressedTocFile, true, false)) { | |
compress(journalReader, compressedWriter, tocReader, compressedTocWriter); | |
compressedWriter.sync(); | |
} catch (final FileNotFoundException fnfe) { | |
logger.info("Failed to compress Journal File {} because it has already been removed", journalFile); | |
return 0L; | |
} | |
final long postCompressionSize = compressedFile.length(); | |
final boolean deletedJournal = delete(journalFile); | |
if ( !deletedJournal ) { | |
delete(compressedFile); | |
delete(compressedTocFile); | |
logger.error("Failed to remove Journal file {}; considering compression task a failure", journalFile); | |
return preCompressionSize; | |
} | |
final boolean deletedToc = delete(tocFile); | |
if ( !deletedToc ) { | |
delete(compressedFile); | |
delete(compressedTocFile); | |
logger.error("Failed to remove TOC file for {}; considering compression task a failure", journalFile); | |
return preCompressionSize; | |
} | |
final boolean renamedJournal = rename(compressedFile, journalFile); | |
if ( !renamedJournal ) { | |
logger.error("Failed to rename {} to {}; this journal file may be inaccessible until it is renamed", compressedFile, journalFile); | |
} | |
final boolean renamedToc = rename(compressedTocFile, tocFile); | |
if ( !renamedToc ) { | |
logger.error("Failed to rename {} to {}; this journal file may be inaccessible until it is renamed", compressedTocFile, tocFile); | |
} | |
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); | |
final double percent = (postCompressionSize / preCompressionSize) * 100D; | |
final String pct = String.format("%.2f", percent); | |
logger.info("Successfully compressed Journal File {} in {} millis; size changed from {} bytes to {} bytes ({}% of original size)", journalFile, millis, preCompressionSize, postCompressionSize, pct); | |
return postCompressionSize; | |
} catch (final IOException ioe) { | |
logger.error("Failed to compress Journal File {} due to {}", journalFile, ioe.toString()); | |
if ( logger.isDebugEnabled() ) { | |
logger.error("", ioe); | |
} | |
return preCompressionSize; | |
} | |
} | |
} |