blob: f04439a9d42d690b180283d2b0acd8aca299d266 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.compress;
import com.google.common.io.Files;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import static org.apache.commons.io.FileUtils.readFileToByteArray;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.After;
import org.junit.Test;
import junit.framework.Assert;
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.DataPosition;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.io.util.SequentialWriterTest;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.utils.ChecksumType;
public class CompressedSequentialWriterTest extends SequentialWriterTest
{
private CompressionParams compressionParameters;
private void runTests(String testName) throws IOException
{
// Test small < 1 chunk data set
testWrite(File.createTempFile(testName + "_small", "1"), 25);
// Test to confirm pipeline w/chunk-aligned data writes works
testWrite(File.createTempFile(testName + "_chunkAligned", "1"), CompressionParams.DEFAULT_CHUNK_LENGTH);
// Test to confirm pipeline on non-chunk boundaries works
testWrite(File.createTempFile(testName + "_large", "1"), CompressionParams.DEFAULT_CHUNK_LENGTH * 3 + 100);
}
@Test
public void testLZ4Writer() throws IOException
{
compressionParameters = CompressionParams.lz4();
runTests("LZ4");
}
@Test
public void testDeflateWriter() throws IOException
{
compressionParameters = CompressionParams.deflate();
runTests("Deflate");
}
@Test
public void testSnappyWriter() throws IOException
{
compressionParameters = CompressionParams.snappy();
runTests("Snappy");
}
private void testWrite(File f, int bytesToTest) throws IOException
{
final String filename = f.getAbsolutePath();
final ChannelProxy channel = new ChannelProxy(f);
try
{
MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(Arrays.<AbstractType<?>>asList(BytesType.instance)));
byte[] dataPre = new byte[bytesToTest];
byte[] rawPost = new byte[bytesToTest];
try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", compressionParameters, sstableMetadataCollector);)
{
Random r = new Random(42);
// Test both write with byte[] and ByteBuffer
r.nextBytes(dataPre);
r.nextBytes(rawPost);
ByteBuffer dataPost = makeBB(bytesToTest);
dataPost.put(rawPost);
dataPost.flip();
writer.write(dataPre);
DataPosition mark = writer.mark();
// Write enough garbage to transition chunk
for (int i = 0; i < CompressionParams.DEFAULT_CHUNK_LENGTH; i++)
{
writer.write((byte)i);
}
if (bytesToTest <= CompressionParams.DEFAULT_CHUNK_LENGTH)
assertEquals(writer.getLastFlushOffset(), CompressionParams.DEFAULT_CHUNK_LENGTH);
else
assertTrue(writer.getLastFlushOffset() % CompressionParams.DEFAULT_CHUNK_LENGTH == 0);
writer.resetAndTruncate(mark);
writer.write(dataPost);
writer.finish();
}
assert f.exists();
RandomAccessReader reader = new CompressedRandomAccessReader.Builder(channel, new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32)).build();
assertEquals(dataPre.length + rawPost.length, reader.length());
byte[] result = new byte[(int)reader.length()];
reader.readFully(result);
assert(reader.isEOF());
reader.close();
byte[] fullInput = new byte[bytesToTest * 2];
System.arraycopy(dataPre, 0, fullInput, 0, dataPre.length);
System.arraycopy(rawPost, 0, fullInput, bytesToTest, rawPost.length);
assert Arrays.equals(result, fullInput);
}
finally
{
// cleanup
channel.close();
if (f.exists())
f.delete();
File metadata = new File(f + ".metadata");
if (metadata.exists())
metadata.delete();
}
}
private ByteBuffer makeBB(int size)
{
return compressionParameters.getSstableCompressor().preferredBufferType().allocate(size);
}
private final List<TestableCSW> writers = new ArrayList<>();
@After
public void cleanup()
{
for (TestableCSW sw : writers)
sw.cleanup();
writers.clear();
}
@Test
@Override
public void resetAndTruncateTest()
{
File tempFile = new File(Files.createTempDir(), "reset.txt");
File offsetsFile = FileUtils.createTempFile("compressedsequentialwriter.offset", "test");
final int bufferSize = 48;
final int writeSize = 64;
byte[] toWrite = new byte[writeSize];
MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(Arrays.<AbstractType<?>>asList(BytesType.instance)));
try (SequentialWriter writer = new CompressedSequentialWriter(tempFile, offsetsFile.getPath(),
CompressionParams.lz4(), sstableMetadataCollector))
{
// write bytes greather than buffer
writer.write(toWrite);
long flushedOffset = writer.getLastFlushOffset();
assertEquals(writeSize, writer.position());
// mark thi position
DataPosition pos = writer.mark();
// write another
writer.write(toWrite);
// another buffer should be flushed
assertEquals(flushedOffset * 2, writer.getLastFlushOffset());
assertEquals(writeSize * 2, writer.position());
// reset writer
writer.resetAndTruncate(pos);
// current position and flushed size should be changed
assertEquals(writeSize, writer.position());
assertEquals(flushedOffset, writer.getLastFlushOffset());
// write another byte less than buffer
writer.write(new byte[]{0});
assertEquals(writeSize + 1, writer.position());
// flush off set should not be increase
assertEquals(flushedOffset, writer.getLastFlushOffset());
writer.finish();
}
catch (IOException e)
{
Assert.fail();
}
}
protected TestableTransaction newTest() throws IOException
{
TestableCSW sw = new TestableCSW();
writers.add(sw);
return sw;
}
private static class TestableCSW extends TestableSW
{
final File offsetsFile;
private TestableCSW() throws IOException
{
this(tempFile("compressedsequentialwriter"),
tempFile("compressedsequentialwriter.offsets"));
}
private TestableCSW(File file, File offsetsFile) throws IOException
{
this(file, offsetsFile, new CompressedSequentialWriter(file,
offsetsFile.getPath(),
CompressionParams.lz4(BUFFER_SIZE),
new MetadataCollector(new ClusteringComparator(UTF8Type.instance))));
}
private TestableCSW(File file, File offsetsFile, CompressedSequentialWriter sw) throws IOException
{
super(file, sw);
this.offsetsFile = offsetsFile;
}
protected void assertInProgress() throws Exception
{
Assert.assertTrue(file.exists());
Assert.assertFalse(offsetsFile.exists());
byte[] compressed = readFileToByteArray(file);
byte[] uncompressed = new byte[partialContents.length];
LZ4Compressor.instance.uncompress(compressed, 0, compressed.length - 4, uncompressed, 0);
Assert.assertTrue(Arrays.equals(partialContents, uncompressed));
}
protected void assertPrepared() throws Exception
{
Assert.assertTrue(file.exists());
Assert.assertTrue(offsetsFile.exists());
DataInputStream offsets = new DataInputStream(new ByteArrayInputStream(readFileToByteArray(offsetsFile)));
Assert.assertTrue(offsets.readUTF().endsWith("LZ4Compressor"));
Assert.assertEquals(0, offsets.readInt());
Assert.assertEquals(BUFFER_SIZE, offsets.readInt());
Assert.assertEquals(fullContents.length, offsets.readLong());
Assert.assertEquals(2, offsets.readInt());
Assert.assertEquals(0, offsets.readLong());
int offset = (int) offsets.readLong();
byte[] compressed = readFileToByteArray(file);
byte[] uncompressed = new byte[fullContents.length];
LZ4Compressor.instance.uncompress(compressed, 0, offset - 4, uncompressed, 0);
LZ4Compressor.instance.uncompress(compressed, offset, compressed.length - (4 + offset), uncompressed, partialContents.length);
Assert.assertTrue(Arrays.equals(fullContents, uncompressed));
}
protected void assertAborted() throws Exception
{
super.assertAborted();
}
void cleanup()
{
file.delete();
offsetsFile.delete();
}
}
}