blob: 39c96898055daa59d108e8ffdcb936051dcb5545 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.util;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import com.google.common.primitives.Ints;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.utils.ChecksumType;
import static junit.framework.Assert.assertNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class MmappedRegionsTest
{
private static final Logger logger = LoggerFactory.getLogger(MmappedRegionsTest.class);
@BeforeClass
public static void setupDD()
{
DatabaseDescriptor.daemonInitialization();
}
private static ByteBuffer allocateBuffer(int size)
{
ByteBuffer ret = ByteBuffer.allocate(Ints.checkedCast(size));
long seed = System.nanoTime();
//seed = 365238103404423L;
logger.info("Seed {}", seed);
new Random(seed).nextBytes(ret.array());
return ret;
}
private static File writeFile(String fileName, ByteBuffer buffer) throws IOException
{
File ret = File.createTempFile(fileName, "1");
ret.deleteOnExit();
try (SequentialWriter writer = new SequentialWriter(ret))
{
writer.write(buffer);
writer.finish();
}
assert ret.exists();
assert ret.length() >= buffer.capacity();
return ret;
}
@Test
public void testEmpty() throws Exception
{
ByteBuffer buffer = allocateBuffer(1024);
try(ChannelProxy channel = new ChannelProxy(writeFile("testEmpty", buffer));
MmappedRegions regions = MmappedRegions.empty(channel))
{
assertTrue(regions.isEmpty());
assertTrue(regions.isValid(channel));
}
}
@Test
public void testTwoSegments() throws Exception
{
ByteBuffer buffer = allocateBuffer(2048);
try(ChannelProxy channel = new ChannelProxy(writeFile("testTwoSegments", buffer));
MmappedRegions regions = MmappedRegions.empty(channel))
{
regions.extend(1024);
for (int i = 0; i < 1024; i++)
{
MmappedRegions.Region region = regions.floor(i);
assertNotNull(region);
assertEquals(0, region.offset());
assertEquals(1024, region.end());
}
regions.extend(2048);
for (int i = 0; i < 2048; i++)
{
MmappedRegions.Region region = regions.floor(i);
assertNotNull(region);
if (i < 1024)
{
assertEquals(0, region.offset());
assertEquals(1024, region.end());
}
else
{
assertEquals(1024, region.offset());
assertEquals(2048, region.end());
}
}
}
}
@Test
public void testSmallSegmentSize() throws Exception
{
int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
MmappedRegions.MAX_SEGMENT_SIZE = 1024;
ByteBuffer buffer = allocateBuffer(4096);
try(ChannelProxy channel = new ChannelProxy(writeFile("testSmallSegmentSize", buffer));
MmappedRegions regions = MmappedRegions.empty(channel))
{
regions.extend(1024);
regions.extend(2048);
regions.extend(4096);
final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
for (int i = 0; i < buffer.capacity(); i++)
{
MmappedRegions.Region region = regions.floor(i);
assertNotNull(region);
assertEquals(SIZE * (i / SIZE), region.offset());
assertEquals(SIZE + (SIZE * (i / SIZE)), region.end());
}
}
finally
{
MmappedRegions.MAX_SEGMENT_SIZE = OLD_MAX_SEGMENT_SIZE;
}
}
@Test
public void testAllocRegions() throws Exception
{
int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
MmappedRegions.MAX_SEGMENT_SIZE = 1024;
ByteBuffer buffer = allocateBuffer(MmappedRegions.MAX_SEGMENT_SIZE * MmappedRegions.REGION_ALLOC_SIZE * 3);
try(ChannelProxy channel = new ChannelProxy(writeFile("testAllocRegions", buffer));
MmappedRegions regions = MmappedRegions.empty(channel))
{
regions.extend(buffer.capacity());
final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
for (int i = 0; i < buffer.capacity(); i++)
{
MmappedRegions.Region region = regions.floor(i);
assertNotNull(region);
assertEquals(SIZE * (i / SIZE), region.offset());
assertEquals(SIZE + (SIZE * (i / SIZE)), region.end());
}
}
finally
{
MmappedRegions.MAX_SEGMENT_SIZE = OLD_MAX_SEGMENT_SIZE;
}
}
@Test
public void testCopy() throws Exception
{
ByteBuffer buffer = allocateBuffer(128 * 1024);
MmappedRegions snapshot;
ChannelProxy channelCopy;
try(ChannelProxy channel = new ChannelProxy(writeFile("testSnapshot", buffer));
MmappedRegions regions = MmappedRegions.map(channel, buffer.capacity() / 4))
{
// create 3 more segments, one per quater capacity
regions.extend(buffer.capacity() / 2);
regions.extend(3 * buffer.capacity() / 4);
regions.extend(buffer.capacity());
// make a snapshot
snapshot = regions.sharedCopy();
// keep the channel open
channelCopy = channel.sharedCopy();
}
assertFalse(snapshot.isCleanedUp());
final int SIZE = buffer.capacity() / 4;
for (int i = 0; i < buffer.capacity(); i++)
{
MmappedRegions.Region region = snapshot.floor(i);
assertNotNull(region);
assertEquals(SIZE * (i / SIZE), region.offset());
assertEquals(SIZE + (SIZE * (i / SIZE)), region.end());
// check we can access the buffer
assertNotNull(region.buffer.duplicate().getInt());
}
assertNull(snapshot.close(null));
assertNull(channelCopy.close(null));
assertTrue(snapshot.isCleanedUp());
}
@Test(expected = AssertionError.class)
public void testCopyCannotExtend() throws Exception
{
ByteBuffer buffer = allocateBuffer(128 * 1024);
MmappedRegions snapshot;
ChannelProxy channelCopy;
try(ChannelProxy channel = new ChannelProxy(writeFile("testSnapshotCannotExtend", buffer));
MmappedRegions regions = MmappedRegions.empty(channel))
{
regions.extend(buffer.capacity() / 2);
// make a snapshot
snapshot = regions.sharedCopy();
// keep the channel open
channelCopy = channel.sharedCopy();
}
try
{
snapshot.extend(buffer.capacity());
}
finally
{
assertNull(snapshot.close(null));
assertNull(channelCopy.close(null));
}
}
@Test
public void testExtendOutOfOrder() throws Exception
{
ByteBuffer buffer = allocateBuffer(4096);
try(ChannelProxy channel = new ChannelProxy(writeFile("testExtendOutOfOrder", buffer));
MmappedRegions regions = MmappedRegions.empty(channel))
{
regions.extend(4096);
regions.extend(1024);
regions.extend(2048);
for (int i = 0; i < buffer.capacity(); i++)
{
MmappedRegions.Region region = regions.floor(i);
assertNotNull(region);
assertEquals(0, region.offset());
assertEquals(4096, region.end());
}
}
}
@Test(expected = IllegalArgumentException.class)
public void testNegativeExtend() throws Exception
{
ByteBuffer buffer = allocateBuffer(1024);
try(ChannelProxy channel = new ChannelProxy(writeFile("testNegativeExtend", buffer));
MmappedRegions regions = MmappedRegions.empty(channel))
{
regions.extend(-1);
}
}
@Test
public void testMapForCompressionMetadata() throws Exception
{
int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
MmappedRegions.MAX_SEGMENT_SIZE = 1024;
ByteBuffer buffer = allocateBuffer(128 * 1024);
File f = File.createTempFile("testMapForCompressionMetadata", "1");
f.deleteOnExit();
File cf = File.createTempFile(f.getName() + ".metadata", "1");
cf.deleteOnExit();
MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
try(SequentialWriter writer = new CompressedSequentialWriter(f, cf.getAbsolutePath(),
null, SequentialWriterOption.DEFAULT,
CompressionParams.snappy(), sstableMetadataCollector))
{
writer.write(buffer);
writer.finish();
}
CompressionMetadata metadata = new CompressionMetadata(cf.getAbsolutePath(), f.length(), ChecksumType.CRC32);
try(ChannelProxy channel = new ChannelProxy(f);
MmappedRegions regions = MmappedRegions.map(channel, metadata))
{
assertFalse(regions.isEmpty());
int i = 0;
while(i < buffer.capacity())
{
CompressionMetadata.Chunk chunk = metadata.chunkFor(i);
MmappedRegions.Region region = regions.floor(chunk.offset);
assertNotNull(region);
ByteBuffer compressedChunk = region.buffer.duplicate();
assertNotNull(compressedChunk);
assertEquals(chunk.length + 4, compressedChunk.capacity());
assertEquals(chunk.offset, region.offset());
assertEquals(chunk.offset + chunk.length + 4, region.end());
i += metadata.chunkLength();
}
}
finally
{
MmappedRegions.MAX_SEGMENT_SIZE = OLD_MAX_SEGMENT_SIZE;
metadata.close();
}
}
@Test(expected = IllegalArgumentException.class)
public void testIllegalArgForMap1() throws Exception
{
ByteBuffer buffer = allocateBuffer(1024);
try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap1", buffer));
MmappedRegions regions = MmappedRegions.map(channel, 0))
{
assertTrue(regions.isEmpty());
}
}
@Test(expected = IllegalArgumentException.class)
public void testIllegalArgForMap2() throws Exception
{
ByteBuffer buffer = allocateBuffer(1024);
try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap2", buffer));
MmappedRegions regions = MmappedRegions.map(channel, -1L))
{
assertTrue(regions.isEmpty());
}
}
@Test(expected = IllegalArgumentException.class)
public void testIllegalArgForMap3() throws Exception
{
ByteBuffer buffer = allocateBuffer(1024);
try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap3", buffer));
MmappedRegions regions = MmappedRegions.map(channel, null))
{
assertTrue(regions.isEmpty());
}
}
}