blob: 0bf8f4dc152568cdd75725b8bd0ab8b16068f3c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.commons.compress.archivers.zip;
import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
import org.apache.commons.compress.parallel.InputStreamSupplier;
import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
import org.apache.commons.compress.utils.IOUtils;
import org.junit.After;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.zip.Deflater;
import java.util.zip.ZipEntry;
import static org.apache.commons.compress.AbstractTestCase.getFile;
import static org.apache.commons.compress.AbstractTestCase.tryHardToDelete;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class ParallelScatterZipCreatorTest {
private final int NUMITEMS = 5000;
private static final long EXPECTED_FILE_SIZE = 1024 * 1024; // 1MB
private static final int EXPECTED_FILES_NUMBER = 50;
private File result;
private File tmp;
@After
public void cleanup() {
tryHardToDelete(result);
tryHardToDelete(tmp);
}
@Test
public void concurrent()
throws Exception {
result = File.createTempFile("parallelScatterGather1", "");
final ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result);
zos.setEncoding("UTF-8");
final ParallelScatterZipCreator zipCreator = new ParallelScatterZipCreator();
final Map<String, byte[]> entries = writeEntries(zipCreator);
zipCreator.writeTo(zos);
zos.close();
removeEntriesFoundInZipFile(result, entries);
assertTrue(entries.isEmpty());
assertNotNull( zipCreator.getStatisticsMessage());
}
@Test
public void callableApiUsingSubmit() throws Exception {
result = File.createTempFile("parallelScatterGather2", "");
callableApi(zipCreator -> c -> zipCreator.submit(c));
}
@Test
public void callableApiUsingSubmitStreamAwareCallable() throws Exception {
result = File.createTempFile("parallelScatterGather3", "");
callableApi(zipCreator -> c -> zipCreator.submitStreamAwareCallable(c));
}
@Test(expected = IllegalArgumentException.class)
public void throwsExceptionWithCompressionLevelTooBig() throws Exception {
final int compressLevelTooBig = Deflater.BEST_COMPRESSION + 1;
final ExecutorService es = Executors.newFixedThreadPool(1);
final ScatterGatherBackingStoreSupplier supp = () -> new FileBasedScatterGatherBackingStore(tmp = File.createTempFile("parallelscatter", "n1"));
new ParallelScatterZipCreator(es, supp, compressLevelTooBig);
}
@Test(expected = IllegalArgumentException.class)
public void throwsExceptionWithCompressionLevelTooSmall() throws Exception {
final int compressLevelTooSmall = Deflater.DEFAULT_COMPRESSION - 1;
final ExecutorService es = Executors.newFixedThreadPool(1);
final ScatterGatherBackingStoreSupplier supp = () -> new FileBasedScatterGatherBackingStore(tmp = File.createTempFile("parallelscatter", "n1"));
new ParallelScatterZipCreator(es, supp, compressLevelTooSmall);
}
@Test
public void callableWithLowestLevelApiUsingSubmit() throws Exception {
result = File.createTempFile("parallelScatterGather4", "");
callableApiWithTestFiles(zipCreator -> c -> zipCreator.submit(c), Deflater.NO_COMPRESSION);
}
@Test
public void callableApiWithHighestLevelUsingSubmitStreamAwareCallable() throws Exception {
result = File.createTempFile("parallelScatterGather5", "");
callableApiWithTestFiles(zipCreator -> c -> zipCreator.submitStreamAwareCallable(c), Deflater.BEST_COMPRESSION);
}
private void callableApi(final CallableConsumerSupplier consumerSupplier) throws Exception {
callableApi(consumerSupplier, Deflater.DEFAULT_COMPRESSION);
}
private void callableApi(final CallableConsumerSupplier consumerSupplier, final int compressionLevel) throws Exception {
final ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result);
zos.setEncoding("UTF-8");
final ExecutorService es = Executors.newFixedThreadPool(1);
final ScatterGatherBackingStoreSupplier supp = () -> new FileBasedScatterGatherBackingStore(tmp = File.createTempFile("parallelscatter", "n1"));
final ParallelScatterZipCreator zipCreator = new ParallelScatterZipCreator(es, supp, compressionLevel);
final Map<String, byte[]> entries = writeEntriesAsCallable(zipCreator, consumerSupplier.apply(zipCreator));
zipCreator.writeTo(zos);
zos.close();
removeEntriesFoundInZipFile(result, entries);
assertTrue(entries.isEmpty());
assertNotNull(zipCreator.getStatisticsMessage());
}
private void callableApiWithTestFiles(final CallableConsumerSupplier consumerSupplier, final int compressionLevel) throws Exception {
final ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result);
zos.setEncoding("UTF-8");
final ExecutorService es = Executors.newFixedThreadPool(1);
final ScatterGatherBackingStoreSupplier supp = () -> new FileBasedScatterGatherBackingStore(tmp = File.createTempFile("parallelscatter", "n1"));
final ParallelScatterZipCreator zipCreator = new ParallelScatterZipCreator(es, supp, compressionLevel);
final Map<String, byte[]> entries = writeTestFilesAsCallable(zipCreator, consumerSupplier.apply(zipCreator));
zipCreator.writeTo(zos);
zos.close();
// validate the content of the compressed files
try (final ZipFile zf = new ZipFile(result)) {
final Enumeration<ZipArchiveEntry> entriesInPhysicalOrder = zf.getEntriesInPhysicalOrder();
while (entriesInPhysicalOrder.hasMoreElements()) {
final ZipArchiveEntry zipArchiveEntry = entriesInPhysicalOrder.nextElement();
final InputStream inputStream = zf.getInputStream(zipArchiveEntry);
final byte[] actual = IOUtils.toByteArray(inputStream);
final byte[] expected = entries.remove(zipArchiveEntry.getName());
assertArrayEquals("For " + zipArchiveEntry.getName(), expected, actual);
}
}
assertNotNull(zipCreator.getStatisticsMessage());
}
private void removeEntriesFoundInZipFile(final File result, final Map<String, byte[]> entries) throws IOException {
final ZipFile zf = new ZipFile(result);
final Enumeration<ZipArchiveEntry> entriesInPhysicalOrder = zf.getEntriesInPhysicalOrder();
int i = 0;
while (entriesInPhysicalOrder.hasMoreElements()){
final ZipArchiveEntry zipArchiveEntry = entriesInPhysicalOrder.nextElement();
final InputStream inputStream = zf.getInputStream(zipArchiveEntry);
final byte[] actual = IOUtils.toByteArray(inputStream);
final byte[] expected = entries.remove(zipArchiveEntry.getName());
assertArrayEquals( "For " + zipArchiveEntry.getName(), expected, actual);
// check order of zip entries vs order of order of addition to the parallel zip creator
assertEquals( "For " + zipArchiveEntry.getName(), "file" + i++, zipArchiveEntry.getName());
}
zf.close();
}
private Map<String, byte[]> writeEntries(final ParallelScatterZipCreator zipCreator) {
final Map<String, byte[]> entries = new HashMap<>();
for (int i = 0; i < NUMITEMS; i++){
final byte[] payloadBytes = ("content" + i).getBytes();
final ZipArchiveEntry za = createZipArchiveEntry(entries, i, payloadBytes);
final InputStreamSupplier iss = () -> new ByteArrayInputStream(payloadBytes);
if (i % 2 == 0) {
zipCreator.addArchiveEntry(za, iss);
} else {
final ZipArchiveEntryRequestSupplier zaSupplier = () -> ZipArchiveEntryRequest.createZipArchiveEntryRequest(za, iss);
zipCreator.addArchiveEntry(zaSupplier);
}
}
return entries;
}
private Map<String, byte[]> writeEntriesAsCallable(final ParallelScatterZipCreator zipCreator,
final CallableConsumer consumer) {
final Map<String, byte[]> entries = new HashMap<>();
for (int i = 0; i < NUMITEMS; i++){
final byte[] payloadBytes = ("content" + i).getBytes();
final ZipArchiveEntry za = createZipArchiveEntry(entries, i, payloadBytes);
final InputStreamSupplier iss = () -> new ByteArrayInputStream(payloadBytes);
final Callable<ScatterZipOutputStream> callable;
if (i % 2 == 0) {
callable = zipCreator.createCallable(za, iss);
} else {
final ZipArchiveEntryRequestSupplier zaSupplier = () -> ZipArchiveEntryRequest.createZipArchiveEntryRequest(za, iss);
callable = zipCreator.createCallable(zaSupplier);
}
consumer.accept(callable);
}
return entries;
}
/**
* Try to compress the files in src/test/resources with size no bigger than
* {@value EXPECTED_FILES_NUMBER} and with a mount of files no bigger than
* {@value EXPECTED_FILES_NUMBER}
*
* @param zipCreator The ParallelScatterZipCreator
* @param consumer The parallel consumer
* @return A map using file name as key and file content as value
* @throws IOException if exceptions occur when opening files
*/
private Map<String, byte[]> writeTestFilesAsCallable(final ParallelScatterZipCreator zipCreator,
final CallableConsumer consumer) throws IOException {
final Map<String, byte[]> entries = new HashMap<>();
final File baseDir = getFile("");
int filesCount = 0;
for (final File file : baseDir.listFiles()) {
// do not compress too many files
if (filesCount >= EXPECTED_FILES_NUMBER) {
break;
}
// skip files that are too large
if (file.isDirectory() || file.length() > EXPECTED_FILE_SIZE) {
continue;
}
entries.put(file.getName(), IOUtils.toByteArray(Files.newInputStream(file.toPath())));
final ZipArchiveEntry zipArchiveEntry = new ZipArchiveEntry(file.getName());
zipArchiveEntry.setMethod(ZipEntry.DEFLATED);
zipArchiveEntry.setSize(file.length());
zipArchiveEntry.setUnixMode(UnixStat.FILE_FLAG | 0664);
final InputStreamSupplier iss = () -> {
try {
return Files.newInputStream(file.toPath());
} catch (final IOException e) {
return null;
}
};
final Callable<ScatterZipOutputStream> callable;
if (filesCount % 2 == 0) {
callable = zipCreator.createCallable(zipArchiveEntry, iss);
} else {
final ZipArchiveEntryRequestSupplier zaSupplier = () -> ZipArchiveEntryRequest.createZipArchiveEntryRequest(zipArchiveEntry, iss);
callable = zipCreator.createCallable(zaSupplier);
}
consumer.accept(callable);
filesCount++;
}
return entries;
}
private ZipArchiveEntry createZipArchiveEntry(final Map<String, byte[]> entries, final int i, final byte[] payloadBytes) {
final ZipArchiveEntry za = new ZipArchiveEntry( "file" + i);
entries.put( za.getName(), payloadBytes);
za.setMethod(ZipEntry.DEFLATED);
za.setSize(payloadBytes.length);
za.setUnixMode(UnixStat.FILE_FLAG | 0664);
return za;
}
private interface CallableConsumer {
void accept(Callable<? extends ScatterZipOutputStream> c);
}
private interface CallableConsumerSupplier {
CallableConsumer apply(ParallelScatterZipCreator zipCreator);
}
}