blob: 245fd76db59f4d23d31602059dee66caf990ebaa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* ---------------------------------------------------------------------
*
* Interfaces to low level compression functionality.
*/
#include "postgres.h"
#include "fmgr.h"
#include "access/genam.h"
#include "access/reloptions.h"
#include "access/tupdesc.h"
#include "access/tupmacs.h"
#include "catalog/catquery.h"
#include "catalog/pg_attribute_encoding.h"
#include "catalog/pg_compression.h"
#include "catalog/dependency.h"
#include "cdb/cdbappendonlyam.h"
#include "cdb/cdbappendonlystoragelayer.h"
#include "nodes/makefuncs.h"
#include "parser/parse_type.h"
#include "storage/gp_compress.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/formatting.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
#include "snappy-c.h"
/* names we expect to see in ENCODING clauses */
char *storage_directive_names[] = {"compresstype", "compresslevel",
"blocksize", NULL};
/* Internal state for zlib */
typedef struct zlib_state
{
int level; /* compression level */
bool compress; /* compress or decompress? */
/*
* The compression and decompression functions.
*/
int (*compress_fn) (Bytef *dest,
uLongf *destLen,
const Bytef *source,
uLong sourceLen,
int level);
int (*decompress_fn) (Bytef *dest,
uLongf *destLen,
const Bytef *source,
uLong sourceLen);
} zlib_state;
static NameData
comptype_to_name(char *comptype)
{
char *dct; /* down cased comptype */
size_t len;
NameData compname;
if (strlen(comptype) >= NAMEDATALEN)
elog(ERROR, "compression name \"%s\" exceeds maximum name length "
"of %d bytes", comptype, NAMEDATALEN - 1);
len = strlen(comptype);
dct = str_tolower(comptype, len);
len = strlen(dct);
memcpy(&(NameStr(compname)), dct, len);
NameStr(compname)[len] = '\0';
return compname;
}
/*
* Find the compression implementation (in pg_compression) for a particular
* compression type.
*
* Comparison is case insensitive.
*/
PGFunction *
GetCompressionImplementation(char *comptype)
{
HeapTuple tuple;
NameData compname;
PGFunction *funcs;
Form_pg_compression ctup;
FmgrInfo finfo;
compname = comptype_to_name(comptype);
/*
* This is a hack: We added the snappy support for row oriented storage, however
* to make the feature friendly to upgradation in short term, we decide to not
* modify the system table to implement this. Following ugly hack is to
* complete this. Let's remove this piece of code after snappy support is
* added to the related system tables.
*/
if (strcmp(NameStr(compname), "snappy") == 0)
{
funcs = palloc0(sizeof(PGFunction) * NUM_COMPRESS_FUNCS);
funcs[COMPRESSION_CONSTRUCTOR] = snappy_constructor;
funcs[COMPRESSION_DESTRUCTOR] = snappy_destructor;
funcs[COMPRESSION_COMPRESS] = snappy_compress_internal;
funcs[COMPRESSION_DECOMPRESS] = snappy_decompress_internal;
funcs[COMPRESSION_VALIDATOR] = snappy_validator;
return funcs;
}
tuple = caql_getfirst(
NULL,
cql("SELECT * FROM pg_compression "
" WHERE compname = :1 ",
NameGetDatum(&compname)));
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("unknown compress type \"%s\"",
comptype)));
funcs = palloc0(sizeof(PGFunction) * NUM_COMPRESS_FUNCS);
ctup = (Form_pg_compression)GETSTRUCT(tuple);
Insist(OidIsValid(ctup->compconstructor));
fmgr_info(ctup->compconstructor, &finfo);
funcs[COMPRESSION_CONSTRUCTOR] = finfo.fn_addr;
Insist(OidIsValid(ctup->compdestructor));
fmgr_info(ctup->compdestructor, &finfo);
funcs[COMPRESSION_DESTRUCTOR] = finfo.fn_addr;
Insist(OidIsValid(ctup->compcompressor));
fmgr_info(ctup->compcompressor, &finfo);
funcs[COMPRESSION_COMPRESS] = finfo.fn_addr;
Insist(OidIsValid(ctup->compdecompressor));
fmgr_info(ctup->compdecompressor, &finfo);
funcs[COMPRESSION_DECOMPRESS] = finfo.fn_addr;
Insist(OidIsValid(ctup->compvalidator));
fmgr_info(ctup->compvalidator, &finfo);
funcs[COMPRESSION_VALIDATOR] = finfo.fn_addr;
return funcs;
}
/* Invokes a compression constructor */
CompressionState *
callCompressionConstructor(PGFunction constructor,
TupleDesc tupledesc,
StorageAttributes *sa,
bool is_compress)
{
return DatumGetPointer(DirectFunctionCall3(constructor,
PointerGetDatum(tupledesc),
PointerGetDatum(sa),
BoolGetDatum(is_compress)));
}
void
callCompressionDestructor(PGFunction destructor, CompressionState *state)
{
DirectFunctionCall1(destructor, PointerGetDatum(state));
}
/* Actually call a compression (or decompression) function */
void
callCompressionActuator(PGFunction func , const void *src , int32 src_sz,
char *dst, int32 dst_sz, int32 *dst_used,
CompressionState *state)
{
(void)DirectFunctionCall6(func, PointerGetDatum(src), Int32GetDatum(src_sz),
PointerGetDatum(dst), Int32GetDatum(dst_sz),
PointerGetDatum(dst_used), PointerGetDatum(state));
}
void
callCompressionValidator(PGFunction func, char *comptype, int32 complevel,
int32 blocksize, Oid typid)
{
StorageAttributes sa;
sa.comptype = comptype;
sa.complevel = complevel;
sa.blocksize = blocksize;
sa.typid = typid;
(void)DirectFunctionCall1(func, PointerGetDatum(&sa));
}
Datum
zlib_constructor(PG_FUNCTION_ARGS)
{
TupleDesc td = PG_GETARG_POINTER(0);
StorageAttributes *sa = PG_GETARG_POINTER(1);
CompressionState *cs = palloc0(sizeof(CompressionState));
zlib_state *state = palloc0(sizeof(zlib_state));
bool compress = PG_GETARG_BOOL(2);
cs->opaque = (void *) state;
cs->desired_sz = NULL;
Insist(PointerIsValid(td));
Insist(PointerIsValid(sa->comptype));
if (sa->complevel == 0)
sa->complevel = 1;
state->level = sa->complevel;
state->compress = compress;
state->compress_fn = compress2;
state->decompress_fn = uncompress;
PG_RETURN_POINTER(cs);
}
Datum
zlib_destructor(PG_FUNCTION_ARGS)
{
CompressionState *cs = PG_GETARG_POINTER(0);
Insist(PointerIsValid(cs->opaque));
pfree(cs->opaque);
PG_RETURN_VOID();
}
Datum
zlib_compress(PG_FUNCTION_ARGS)
{
const void *src = PG_GETARG_POINTER(0);
int32 src_sz = PG_GETARG_INT32(1);
void *dst = PG_GETARG_POINTER(2);
int32 dst_sz = PG_GETARG_INT32(3);
int32 *dst_used = PG_GETARG_POINTER(4);
CompressionState *cs = (CompressionState *) PG_GETARG_POINTER(5);
zlib_state *state = (zlib_state *) cs->opaque;
int last_error;
unsigned long amount_available_used = dst_sz;
last_error = state->compress_fn((unsigned char *) dst,
&amount_available_used, src, src_sz,
state->level);
*dst_used = amount_available_used;
if (last_error != Z_OK)
{
switch (last_error)
{
case Z_MEM_ERROR:
elog(ERROR, "out of memory");
break;
case Z_BUF_ERROR:
/*
* zlib returns this when it couldn't compressed the data
* to a size smaller than the input.
*
* The caller expects to detect this themselves so we set
* dst_used accordingly.
*/
*dst_used = src_sz;
break;
default:
/* shouldn't get here */
Insist(false);
break;
}
}
PG_RETURN_VOID();
}
Datum
zlib_decompress(PG_FUNCTION_ARGS)
{
const char *src = PG_GETARG_POINTER(0);
int32 src_sz = PG_GETARG_INT32(1);
void *dst = PG_GETARG_POINTER(2);
int32 dst_sz = PG_GETARG_INT32(3);
int32 *dst_used = PG_GETARG_POINTER(4);
CompressionState *cs = (CompressionState *) PG_GETARG_POINTER(5);
zlib_state *state = (zlib_state *) cs->opaque;
int last_error;
unsigned long amount_available_used = dst_sz;
Insist(src_sz > 0 && dst_sz > 0);
last_error = state->decompress_fn(dst, &amount_available_used,
(const Bytef *) src, src_sz);
*dst_used = amount_available_used;
if (last_error != Z_OK)
{
switch (last_error)
{
case Z_MEM_ERROR:
elog(ERROR, "out of memory");
break;
case Z_BUF_ERROR:
/*
* This would be a bug. We should have given a buffer big
* enough in the decompress case.
*/
elog(ERROR, "buffer size %d insufficient for compressed data",
dst_sz);
break;
case Z_DATA_ERROR:
/*
* zlib data structures corrupted.
*
* Check out the error message: kind of like 'catalog
* convergence' for data corruption :-).
*/
elog(ERROR, "zlib encountered data in an unexpected format");
default:
/* shouldn't get here */
Insist(false);
break;
}
}
PG_RETURN_VOID();
}
Datum
zlib_validator(PG_FUNCTION_ARGS)
{
PG_RETURN_VOID();
}
Datum
snappy_constructor(PG_FUNCTION_ARGS)
{
TupleDesc td = PG_GETARG_POINTER(0);
StorageAttributes *sa = PG_GETARG_POINTER(1);
CompressionState *cs = palloc0(sizeof(CompressionState));
cs->opaque = NULL;
cs->desired_sz = snappy_max_compressed_length;
Insist(PointerIsValid(td));
Insist(PointerIsValid(sa->comptype));
PG_RETURN_POINTER(cs);
}
Datum
snappy_destructor(PG_FUNCTION_ARGS)
{
CompressionState *cs = PG_GETARG_POINTER(0);
if (cs->opaque)
{
Insist(PointerIsValid(cs->opaque));
pfree(cs->opaque);
}
PG_RETURN_VOID();
}
static void
elog_snappy_error(snappy_status retval, char *func_name,
int src_sz, int dst_sz, int dst_used)
{
switch (retval)
{
case SNAPPY_INVALID_INPUT:
elog(ERROR, "invalid input for %s(): "
"src_sz=%d dst_sz=%d dst_used=%d",
func_name, src_sz, dst_sz, dst_used);
break;
case SNAPPY_BUFFER_TOO_SMALL:
elog(ERROR, "buffer is too small in %s(): "
"src_sz=%d dst_sz=%d dst_used=%d",
func_name, src_sz, dst_sz, dst_used);
break;
default:
elog(ERROR, "unknown failure (return value %d) for %s(): "
"src_sz=%d dst_sz=%d dst_used=%d", retval, func_name,
src_sz, dst_sz, dst_used);
break;
}
}
Datum
snappy_compress_internal(PG_FUNCTION_ARGS)
{
const char *src = PG_GETARG_POINTER(0);
size_t src_sz = PG_GETARG_INT32(1);
char *dst = PG_GETARG_POINTER(2);
size_t dst_sz = PG_GETARG_INT32(3);
int32 *dst_used = PG_GETARG_POINTER(4);
size_t compressed_length;
snappy_status retval;
compressed_length = snappy_max_compressed_length(src_sz);
Insist(dst_sz >= compressed_length);
retval = snappy_compress(src, src_sz, dst, &compressed_length);
*dst_used = compressed_length;
if (retval != SNAPPY_OK)
elog_snappy_error(retval, "snappy_compress", src_sz, dst_sz, *dst_used);
PG_RETURN_VOID();
}
Datum
snappy_decompress_internal(PG_FUNCTION_ARGS)
{
const char *src = PG_GETARG_POINTER(0);
size_t src_sz = PG_GETARG_INT32(1);
char *dst = PG_GETARG_POINTER(2);
size_t dst_sz = PG_GETARG_INT32(3);
int32 *dst_used = PG_GETARG_POINTER(4);
size_t uncompressed_length;
snappy_status retval;
Insist(src_sz > 0 && dst_sz > 0);
retval = snappy_uncompressed_length(src, src_sz,
&uncompressed_length);
if (retval != SNAPPY_OK)
elog_snappy_error(retval, "snappy_uncompressed_length",
src_sz, dst_sz, *dst_used);
Insist(dst_sz >= uncompressed_length);
retval = snappy_uncompress(src, src_sz, dst, &uncompressed_length);
*dst_used = uncompressed_length;
if (retval != SNAPPY_OK)
elog_snappy_error(retval, "snappy_uncompressed",
src_sz, dst_sz, *dst_used);
PG_RETURN_VOID();
}
Datum
snappy_validator(PG_FUNCTION_ARGS)
{
PG_RETURN_VOID();
}
Datum
rle_type_constructor(PG_FUNCTION_ARGS)
{
elog(ERROR, "rle_type block compression not supported");
PG_RETURN_VOID();
}
Datum
rle_type_destructor(PG_FUNCTION_ARGS)
{
elog(ERROR, "rle_type block compression not supported");
PG_RETURN_VOID();
}
Datum
rle_type_compress(PG_FUNCTION_ARGS)
{
elog(ERROR, "rle_type block compression not supported");
PG_RETURN_VOID();
}
Datum
rle_type_decompress(PG_FUNCTION_ARGS)
{
elog(ERROR, "rle_type block compression not supported");
PG_RETURN_VOID();
}
Datum
rle_type_validator(PG_FUNCTION_ARGS)
{
elog(ERROR, "rle_type block compression not supported");
PG_RETURN_VOID();
}
/* Dummy routines to implement compresstype=none */
Datum
dummy_compression_constructor(PG_FUNCTION_ARGS)
{
elog(ERROR, "dummy compression called directly");
PG_RETURN_VOID();
}
Datum
dummy_compression_destructor(PG_FUNCTION_ARGS)
{
elog(ERROR, "dummy compression called directly");
PG_RETURN_VOID();
}
Datum
dummy_compression_compress(PG_FUNCTION_ARGS)
{
elog(ERROR, "dummy compression called directly");
PG_RETURN_VOID();
}
Datum
dummy_compression_decompress(PG_FUNCTION_ARGS)
{
elog(ERROR, "dummy compression called directly");
PG_RETURN_VOID();
}
Datum
dummy_compression_validator(PG_FUNCTION_ARGS)
{
elog(ERROR, "dummy compression called directly");
PG_RETURN_VOID();
}
/*
* Does a compression algorithm exist by the name of `compresstype'?
*/
bool
compresstype_is_valid(char *comptype)
{
NameData compname;
bool found = false;
compname = comptype_to_name(comptype);
found = (0 !=
caql_getcount(
NULL,
cql("SELECT COUNT(*) FROM pg_compression "
" WHERE compname = :1 ",
NameGetDatum(&compname))));
/*
* FIXME: This is a hack. Should implement related handlers and register
* in system tables instead. snappy handlers have already been implemented
* but not registerd in system tables (see comment in GetCompressionImplement()
* for details).
*/
if(!found)
{
if(strcmp(comptype, "snappy") == 0 || strcmp(comptype, "gzip") == 0)
found = true;
}
return found;
}
/*
* Make encoding (compresstype = none, blocksize=...). We do this for the case
* where the user has not specified an encoding for the column.
*/
List *
default_column_encoding_clause(void)
{
DefElem *e1 = makeDefElem("compresstype", (Node *)makeString("none"));
DefElem *e2 = makeDefElem("blocksize",
(Node *)makeInteger(DEFAULT_APPENDONLY_BLOCK_SIZE));
DefElem *e3 = makeDefElem("compresslevel",
(Node *)makeInteger(0));
return list_make3(e1, e2, e3);
}
bool
is_storage_encoding_directive(char *name)
{
int i = 0;
while (storage_directive_names[i])
{
if (strcmp(name, storage_directive_names[i]) == 0)
return true;
i++;
}
return false;
}