| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| package org.apache.commons.compress.archivers.zip; |
| |
| import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore; |
| import org.apache.commons.compress.parallel.InputStreamSupplier; |
| import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier; |
| import org.apache.commons.compress.utils.IOUtils; |
| import org.junit.After; |
| import org.junit.Test; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.nio.file.Files; |
| import java.util.Enumeration; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.zip.Deflater; |
| import java.util.zip.ZipEntry; |
| |
| import static org.apache.commons.compress.AbstractTestCase.getFile; |
| import static org.apache.commons.compress.AbstractTestCase.tryHardToDelete; |
| import static org.junit.Assert.assertArrayEquals; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| |
| public class ParallelScatterZipCreatorTest { |
| |
| private final int NUMITEMS = 5000; |
| private static final long EXPECTED_FILE_SIZE = 1024 * 1024; // 1MB |
| private static final int EXPECTED_FILES_NUMBER = 50; |
| |
| private File result; |
| private File tmp; |
| |
| @After |
| public void cleanup() { |
| tryHardToDelete(result); |
| tryHardToDelete(tmp); |
| } |
| |
| @Test |
| public void concurrent() |
| throws Exception { |
| result = File.createTempFile("parallelScatterGather1", ""); |
| final ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result); |
| zos.setEncoding("UTF-8"); |
| final ParallelScatterZipCreator zipCreator = new ParallelScatterZipCreator(); |
| |
| final Map<String, byte[]> entries = writeEntries(zipCreator); |
| zipCreator.writeTo(zos); |
| zos.close(); |
| removeEntriesFoundInZipFile(result, entries); |
| assertTrue(entries.isEmpty()); |
| assertNotNull( zipCreator.getStatisticsMessage()); |
| } |
| |
| @Test |
| public void callableApiUsingSubmit() throws Exception { |
| result = File.createTempFile("parallelScatterGather2", ""); |
| callableApi(zipCreator -> c -> zipCreator.submit(c)); |
| } |
| |
| @Test |
| public void callableApiUsingSubmitStreamAwareCallable() throws Exception { |
| result = File.createTempFile("parallelScatterGather3", ""); |
| callableApi(zipCreator -> c -> zipCreator.submitStreamAwareCallable(c)); |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void throwsExceptionWithCompressionLevelTooBig() throws Exception { |
| final int compressLevelTooBig = Deflater.BEST_COMPRESSION + 1; |
| final ExecutorService es = Executors.newFixedThreadPool(1); |
| final ScatterGatherBackingStoreSupplier supp = () -> new FileBasedScatterGatherBackingStore(tmp = File.createTempFile("parallelscatter", "n1")); |
| |
| new ParallelScatterZipCreator(es, supp, compressLevelTooBig); |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void throwsExceptionWithCompressionLevelTooSmall() throws Exception { |
| final int compressLevelTooSmall = Deflater.DEFAULT_COMPRESSION - 1; |
| final ExecutorService es = Executors.newFixedThreadPool(1); |
| final ScatterGatherBackingStoreSupplier supp = () -> new FileBasedScatterGatherBackingStore(tmp = File.createTempFile("parallelscatter", "n1")); |
| |
| new ParallelScatterZipCreator(es, supp, compressLevelTooSmall); |
| } |
| |
| @Test |
| public void callableWithLowestLevelApiUsingSubmit() throws Exception { |
| result = File.createTempFile("parallelScatterGather4", ""); |
| callableApiWithTestFiles(zipCreator -> c -> zipCreator.submit(c), Deflater.NO_COMPRESSION); |
| } |
| |
| @Test |
| public void callableApiWithHighestLevelUsingSubmitStreamAwareCallable() throws Exception { |
| result = File.createTempFile("parallelScatterGather5", ""); |
| callableApiWithTestFiles(zipCreator -> c -> zipCreator.submitStreamAwareCallable(c), Deflater.BEST_COMPRESSION); |
| } |
| |
| private void callableApi(final CallableConsumerSupplier consumerSupplier) throws Exception { |
| callableApi(consumerSupplier, Deflater.DEFAULT_COMPRESSION); |
| } |
| |
| private void callableApi(final CallableConsumerSupplier consumerSupplier, final int compressionLevel) throws Exception { |
| final ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result); |
| zos.setEncoding("UTF-8"); |
| final ExecutorService es = Executors.newFixedThreadPool(1); |
| |
| final ScatterGatherBackingStoreSupplier supp = () -> new FileBasedScatterGatherBackingStore(tmp = File.createTempFile("parallelscatter", "n1")); |
| |
| final ParallelScatterZipCreator zipCreator = new ParallelScatterZipCreator(es, supp, compressionLevel); |
| final Map<String, byte[]> entries = writeEntriesAsCallable(zipCreator, consumerSupplier.apply(zipCreator)); |
| zipCreator.writeTo(zos); |
| zos.close(); |
| |
| |
| removeEntriesFoundInZipFile(result, entries); |
| assertTrue(entries.isEmpty()); |
| assertNotNull(zipCreator.getStatisticsMessage()); |
| } |
| |
| private void callableApiWithTestFiles(final CallableConsumerSupplier consumerSupplier, final int compressionLevel) throws Exception { |
| final ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result); |
| zos.setEncoding("UTF-8"); |
| final ExecutorService es = Executors.newFixedThreadPool(1); |
| |
| final ScatterGatherBackingStoreSupplier supp = () -> new FileBasedScatterGatherBackingStore(tmp = File.createTempFile("parallelscatter", "n1")); |
| |
| final ParallelScatterZipCreator zipCreator = new ParallelScatterZipCreator(es, supp, compressionLevel); |
| final Map<String, byte[]> entries = writeTestFilesAsCallable(zipCreator, consumerSupplier.apply(zipCreator)); |
| zipCreator.writeTo(zos); |
| zos.close(); |
| |
| // validate the content of the compressed files |
| try (final ZipFile zf = new ZipFile(result)) { |
| final Enumeration<ZipArchiveEntry> entriesInPhysicalOrder = zf.getEntriesInPhysicalOrder(); |
| while (entriesInPhysicalOrder.hasMoreElements()) { |
| final ZipArchiveEntry zipArchiveEntry = entriesInPhysicalOrder.nextElement(); |
| final InputStream inputStream = zf.getInputStream(zipArchiveEntry); |
| final byte[] actual = IOUtils.toByteArray(inputStream); |
| final byte[] expected = entries.remove(zipArchiveEntry.getName()); |
| assertArrayEquals("For " + zipArchiveEntry.getName(), expected, actual); |
| } |
| } |
| assertNotNull(zipCreator.getStatisticsMessage()); |
| } |
| |
| private void removeEntriesFoundInZipFile(final File result, final Map<String, byte[]> entries) throws IOException { |
| final ZipFile zf = new ZipFile(result); |
| final Enumeration<ZipArchiveEntry> entriesInPhysicalOrder = zf.getEntriesInPhysicalOrder(); |
| int i = 0; |
| while (entriesInPhysicalOrder.hasMoreElements()){ |
| final ZipArchiveEntry zipArchiveEntry = entriesInPhysicalOrder.nextElement(); |
| final InputStream inputStream = zf.getInputStream(zipArchiveEntry); |
| final byte[] actual = IOUtils.toByteArray(inputStream); |
| final byte[] expected = entries.remove(zipArchiveEntry.getName()); |
| assertArrayEquals( "For " + zipArchiveEntry.getName(), expected, actual); |
| // check order of zip entries vs order of order of addition to the parallel zip creator |
| assertEquals( "For " + zipArchiveEntry.getName(), "file" + i++, zipArchiveEntry.getName()); |
| } |
| zf.close(); |
| } |
| |
| private Map<String, byte[]> writeEntries(final ParallelScatterZipCreator zipCreator) { |
| final Map<String, byte[]> entries = new HashMap<>(); |
| for (int i = 0; i < NUMITEMS; i++){ |
| final byte[] payloadBytes = ("content" + i).getBytes(); |
| final ZipArchiveEntry za = createZipArchiveEntry(entries, i, payloadBytes); |
| final InputStreamSupplier iss = () -> new ByteArrayInputStream(payloadBytes); |
| if (i % 2 == 0) { |
| zipCreator.addArchiveEntry(za, iss); |
| } else { |
| final ZipArchiveEntryRequestSupplier zaSupplier = () -> ZipArchiveEntryRequest.createZipArchiveEntryRequest(za, iss); |
| zipCreator.addArchiveEntry(zaSupplier); |
| } |
| } |
| return entries; |
| } |
| |
| private Map<String, byte[]> writeEntriesAsCallable(final ParallelScatterZipCreator zipCreator, |
| final CallableConsumer consumer) { |
| final Map<String, byte[]> entries = new HashMap<>(); |
| for (int i = 0; i < NUMITEMS; i++){ |
| final byte[] payloadBytes = ("content" + i).getBytes(); |
| final ZipArchiveEntry za = createZipArchiveEntry(entries, i, payloadBytes); |
| final InputStreamSupplier iss = () -> new ByteArrayInputStream(payloadBytes); |
| final Callable<ScatterZipOutputStream> callable; |
| if (i % 2 == 0) { |
| callable = zipCreator.createCallable(za, iss); |
| } else { |
| final ZipArchiveEntryRequestSupplier zaSupplier = () -> ZipArchiveEntryRequest.createZipArchiveEntryRequest(za, iss); |
| callable = zipCreator.createCallable(zaSupplier); |
| } |
| |
| consumer.accept(callable); |
| } |
| return entries; |
| } |
| |
| /** |
| * Try to compress the files in src/test/resources with size no bigger than |
| * {@value EXPECTED_FILES_NUMBER} and with a mount of files no bigger than |
| * {@value EXPECTED_FILES_NUMBER} |
| * |
| * @param zipCreator The ParallelScatterZipCreator |
| * @param consumer The parallel consumer |
| * @return A map using file name as key and file content as value |
| * @throws IOException if exceptions occur when opening files |
| */ |
| private Map<String, byte[]> writeTestFilesAsCallable(final ParallelScatterZipCreator zipCreator, |
| final CallableConsumer consumer) throws IOException { |
| final Map<String, byte[]> entries = new HashMap<>(); |
| final File baseDir = getFile(""); |
| int filesCount = 0; |
| for (final File file : baseDir.listFiles()) { |
| // do not compress too many files |
| if (filesCount >= EXPECTED_FILES_NUMBER) { |
| break; |
| } |
| |
| // skip files that are too large |
| if (file.isDirectory() || file.length() > EXPECTED_FILE_SIZE) { |
| continue; |
| } |
| |
| entries.put(file.getName(), IOUtils.toByteArray(Files.newInputStream(file.toPath()))); |
| |
| final ZipArchiveEntry zipArchiveEntry = new ZipArchiveEntry(file.getName()); |
| zipArchiveEntry.setMethod(ZipEntry.DEFLATED); |
| zipArchiveEntry.setSize(file.length()); |
| zipArchiveEntry.setUnixMode(UnixStat.FILE_FLAG | 0664); |
| |
| final InputStreamSupplier iss = () -> { |
| try { |
| return Files.newInputStream(file.toPath()); |
| } catch (final IOException e) { |
| return null; |
| } |
| }; |
| |
| final Callable<ScatterZipOutputStream> callable; |
| if (filesCount % 2 == 0) { |
| callable = zipCreator.createCallable(zipArchiveEntry, iss); |
| } else { |
| final ZipArchiveEntryRequestSupplier zaSupplier = () -> ZipArchiveEntryRequest.createZipArchiveEntryRequest(zipArchiveEntry, iss); |
| callable = zipCreator.createCallable(zaSupplier); |
| } |
| |
| consumer.accept(callable); |
| filesCount++; |
| } |
| return entries; |
| } |
| |
| private ZipArchiveEntry createZipArchiveEntry(final Map<String, byte[]> entries, final int i, final byte[] payloadBytes) { |
| final ZipArchiveEntry za = new ZipArchiveEntry( "file" + i); |
| entries.put( za.getName(), payloadBytes); |
| za.setMethod(ZipEntry.DEFLATED); |
| za.setSize(payloadBytes.length); |
| za.setUnixMode(UnixStat.FILE_FLAG | 0664); |
| return za; |
| } |
| |
| private interface CallableConsumer { |
| void accept(Callable<? extends ScatterZipOutputStream> c); |
| } |
| private interface CallableConsumerSupplier { |
| CallableConsumer apply(ParallelScatterZipCreator zipCreator); |
| } |
| } |