blob: 051b883b1fc56b4ead8ebc70418560683e8a213b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.compress;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteComponentType;
import org.apache.ignite.internal.ThreadLocalDirectByteBuffer;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.U;
import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE;
import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE;
import static org.apache.ignite.internal.util.GridUnsafe.NATIVE_BYTE_ORDER;
/**
* Compression processor.
*
* @see IgniteComponentType#COMPRESSION
*/
public class CompressionProcessor extends GridProcessorAdapter {
/** */
public static final int LZ4_MIN_LEVEL = 0;
/** */
public static final int LZ4_MAX_LEVEL = 17;
/** */
public static final int LZ4_DEFAULT_LEVEL = 0;
/** */
public static final int ZSTD_MIN_LEVEL = -131072;
/** */
public static final int ZSTD_MAX_LEVEL = 22;
/** */
public static final int ZSTD_DEFAULT_LEVEL = 3;
/** */
public static final byte UNCOMPRESSED_PAGE = 0;
/** */
protected static final byte COMPACTED_PAGE = 1;
/** */
protected static final byte ZSTD_COMPRESSED_PAGE = 2;
/** */
protected static final byte LZ4_COMPRESSED_PAGE = 3;
/** */
protected static final byte SNAPPY_COMPRESSED_PAGE = 4;
/** Max page size. */
private final ThreadLocalDirectByteBuffer compactBuf = new ThreadLocalDirectByteBuffer(MAX_PAGE_SIZE, NATIVE_BYTE_ORDER);
/**
* @param ctx Kernal context.
*/
public CompressionProcessor(GridKernalContext ctx) {
super(ctx);
}
/**
* @param compressLevel Compression level.
* @param compression Compression algorithm.
* @return Compression level.
*/
public static int getCompressionLevel(Integer compressLevel, DiskPageCompression compression) {
return compressLevel != null ? checkCompressionLevelBounds(compressLevel, compression) :
getDefaultCompressionLevel(compression);
}
/**
* @param compression Compression algorithm.
* @return Default compression level.
*/
public static int getDefaultCompressionLevel(DiskPageCompression compression) {
switch (compression) {
case ZSTD:
return ZSTD_DEFAULT_LEVEL;
case LZ4:
return LZ4_DEFAULT_LEVEL;
case SNAPPY:
case SKIP_GARBAGE:
case DISABLED:
return 0;
}
throw new IllegalArgumentException("Compression: " + compression);
}
/**
* @param compressLevel Compression level.
* @param compression Compression algorithm.
* @return Compression level.
*/
public static int checkCompressionLevelBounds(int compressLevel, DiskPageCompression compression) {
switch (compression) {
case ZSTD:
checkCompressionLevelBounds(compressLevel, ZSTD_MIN_LEVEL, ZSTD_MAX_LEVEL);
break;
case LZ4:
checkCompressionLevelBounds(compressLevel, LZ4_MIN_LEVEL, LZ4_MAX_LEVEL);
break;
default:
throw new IllegalArgumentException("Compression level for " + compression + " is not supported.");
}
return compressLevel;
}
/**
* @param compressLevel Compression level.
* @param min Min level.
* @param max Max level.
*/
private static void checkCompressionLevelBounds(int compressLevel, int min, int max) {
if (compressLevel < min || compressLevel > max) {
throw new IllegalArgumentException("Compression level for LZ4 must be between " + min +
" and " + max + ".");
}
}
/**
* @throws IgniteCheckedException Always.
*/
private static <T> T fail() throws IgniteCheckedException {
throw new IgniteCheckedException("Make sure that ignite-compress module is in classpath.");
}
/**
* Checks weither page compression can be used for page file storage.
*
* @throws IgniteCheckedException If compression is not supported.
*/
public void checkPageCompressionSupported() throws IgniteCheckedException {
fail();
}
/**
* Checks weither page file storage supports compression.
*
* @param storagePath Storage path.
* @param pageSize Page size.
* @throws IgniteCheckedException If compression is not supported.
*/
public void checkPageCompressionSupported(Path storagePath, int pageSize) throws IgniteCheckedException {
fail();
}
/**
* @param page Page.
* @param compactSize Compacted page size.
* @return The given page.
*/
protected static ByteBuffer setCompactionInfo(ByteBuffer page, int compactSize) {
return setCompressionInfo(page, SKIP_GARBAGE, compactSize, compactSize);
}
/**
* @param page Page.
* @param compression Compression algorithm.
* @param compressedSize Compressed size.
* @param compactedSize Compact size.
* @return The given page.
*/
protected static ByteBuffer setCompressionInfo(
ByteBuffer page,
DiskPageCompression compression,
int compressedSize,
int compactedSize
) {
assert compressedSize >= 0 && compressedSize <= Short.MAX_VALUE : compressedSize;
assert compactedSize >= 0 && compactedSize <= Short.MAX_VALUE : compactedSize;
PageIO.setCompressionType(page, getCompressionType(compression));
PageIO.setCompressedSize(page, (short)compressedSize);
PageIO.setCompactedSize(page, (short)compactedSize);
return page;
}
/**
* @param compression Compression.
* @return Level.
*/
private static byte getCompressionType(DiskPageCompression compression) {
if (compression == DiskPageCompression.DISABLED)
return UNCOMPRESSED_PAGE;
switch (compression) {
case ZSTD:
return ZSTD_COMPRESSED_PAGE;
case LZ4:
return LZ4_COMPRESSED_PAGE;
case SNAPPY:
return SNAPPY_COMPRESSED_PAGE;
case SKIP_GARBAGE:
return COMPACTED_PAGE;
}
throw new IllegalStateException("Unexpected compression: " + compression);
}
/**
* @param page Page buffer.
* @param pageSize Page size.
* @param blockSize Store block size.
* @param compression Compression algorithm.
* @param compressLevel Compression level.
* @return Possibly compressed buffer.
* @throws IgniteCheckedException If failed.
*/
public ByteBuffer compressPage(
ByteBuffer page,
int pageSize,
int blockSize,
DiskPageCompression compression,
int compressLevel
) throws IgniteCheckedException {
assert compression != null && compression != DiskPageCompression.DISABLED : compression;
assert U.isPow2(blockSize) : blockSize;
assert page.position() == 0 && page.limit() >= pageSize;
int oldPageLimit = page.limit();
try {
// Page size will be less than page limit when TDE is enabled. To make compaction and compression work
// correctly we need to set limit to real page size.
page.limit(pageSize);
ByteBuffer compactPage = doCompactPage(page, pageSize);
int compactSize = compactPage.limit();
assert compactSize <= pageSize : compactSize;
// If no need to compress further or configured just to skip garbage.
if (compactSize < blockSize || compression == SKIP_GARBAGE)
return setCompactionInfo(compactPage, compactSize);
ByteBuffer compressedPage = doCompressPage(compression, compactPage, compactSize, compressLevel);
assert compressedPage.position() == 0;
int compressedSize = compressedPage.limit();
int freeCompactBlocks = (pageSize - compactSize) / blockSize;
int freeCompressedBlocks = (pageSize - compressedSize) / blockSize;
if (freeCompactBlocks >= freeCompressedBlocks) {
if (freeCompactBlocks == 0)
return page; // No blocks will be released.
return setCompactionInfo(compactPage, compactSize);
}
return setCompressionInfo(compressedPage, compression, compressedSize, compactSize);
}
finally {
page.limit(oldPageLimit);
}
}
/**
* @param page Page buffer.
* @param pageSize Page size.
* @return Compacted page buffer.
*/
protected ByteBuffer doCompactPage(ByteBuffer page, int pageSize) throws IgniteCheckedException {
PageIO io = PageIO.getPageIO(page);
ByteBuffer compactPage = compactBuf.get();
if (io instanceof CompactablePageIO) {
// Drop the garbage from the page.
((CompactablePageIO)io).compactPage(page, compactPage, pageSize);
}
else {
// Direct buffer is required as output of this method.
if (page.isDirect())
return page;
PageUtils.putBytes(GridUnsafe.bufferAddress(compactPage), 0, page.array());
compactPage.limit(pageSize);
}
return compactPage;
}
/**
* @param compression Compression algorithm.
* @param compactPage Compacted page.
* @param compactSize Compacted page size.
* @param compressLevel Compression level.
* @return Compressed page.
*/
protected ByteBuffer doCompressPage(
DiskPageCompression compression,
ByteBuffer compactPage,
int compactSize,
int compressLevel
) {
throw new IllegalStateException("Unsupported compression: " + compression);
}
/**
* @param page Possibly compressed page buffer.
* @param pageSize Page size.
* @throws IgniteCheckedException If failed.
*/
public void decompressPage(ByteBuffer page, int pageSize) throws IgniteCheckedException {
assert page.capacity() >= pageSize : "capacity=" + page.capacity() + ", pageSize=" + pageSize;
byte compressType = PageIO.getCompressionType(page);
if (compressType == UNCOMPRESSED_PAGE)
return; // Nothing to do.
short compressedSize = PageIO.getCompressedSize(page);
short compactSize = PageIO.getCompactedSize(page);
assert compactSize <= pageSize && compactSize >= compressedSize;
if (compressType == COMPACTED_PAGE) {
// Just setup bounds before restoring the page.
page.position(0).limit(compactSize);
}
else
doDecompressPage(compressType, page, compressedSize, compactSize);
PageIO io = PageIO.getPageIO(page);
if (io instanceof CompactablePageIO)
((CompactablePageIO)io).restorePage(page, pageSize);
else {
assert compactSize == pageSize
: "Wrong compacted page size [compactSize=" + compactSize + ", pageSize=" + pageSize + ']';
}
setCompressionInfo(page, DiskPageCompression.DISABLED, 0, 0);
}
/** */
protected void doDecompressPage(int compressType, ByteBuffer page, int compressedSize, int compactSize) {
throw new IllegalStateException("Unsupported compression: " + compressType);
}
}