blob: 4a2502b5e6402344fa44cb6f353e524db7297de7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
#include <string.h>
#ifdef SNAPPY_CODEC
#include <snappy-c.h>
# if defined(__APPLE__)
# include <libkern/OSByteOrder.h>
# define __bswap_32 OSSwapInt32
# else
# include <byteswap.h>
# endif
#endif
#ifdef DEFLATE_CODEC
#include <zlib.h>
#endif
#ifdef LZMA_CODEC
#include <lzma.h>
#endif
#include "avro/errors.h"
#include "avro/allocation.h"
#include "codec.h"
#define DEFAULT_BLOCK_SIZE (16 * 1024)
/* NULL codec */
static int
codec_null(avro_codec_t codec)
{
codec->name = "null";
codec->type = AVRO_CODEC_NULL;
codec->block_size = 0;
codec->used_size = 0;
codec->block_data = NULL;
codec->codec_data = NULL;
return 0;
}
static int encode_null(avro_codec_t c, void * data, int64_t len)
{
c->block_data = data;
c->block_size = len;
c->used_size = len;
return 0;
}
static int decode_null(avro_codec_t c, void * data, int64_t len)
{
c->block_data = data;
c->block_size = len;
c->used_size = len;
return 0;
}
static int reset_null(avro_codec_t c)
{
c->block_data = NULL;
c->block_size = 0;
c->used_size = 0;
c->codec_data = NULL;
return 0;
}
/* Snappy codec */
#ifdef SNAPPY_CODEC
static int
codec_snappy(avro_codec_t codec)
{
codec->name = "snappy";
codec->type = AVRO_CODEC_SNAPPY;
codec->block_size = 0;
codec->used_size = 0;
codec->block_data = NULL;
codec->codec_data = NULL;
return 0;
}
static int encode_snappy(avro_codec_t c, void * data, int64_t len)
{
uint32_t crc;
size_t outlen = snappy_max_compressed_length(len);
if (!c->block_data) {
c->block_data = avro_malloc(outlen+4);
c->block_size = outlen+4;
} else if (c->block_size < (int64_t) (outlen+4)) {
c->block_data = avro_realloc(c->block_data, c->block_size, (outlen+4));
c->block_size = outlen+4;
}
if (!c->block_data) {
avro_set_error("Cannot allocate memory for snappy");
return 1;
}
if (snappy_compress(data, len, c->block_data, &outlen) != SNAPPY_OK)
{
avro_set_error("Error compressing block with Snappy");
return 1;
}
crc = __bswap_32(crc32(0, data, len));
memcpy(c->block_data+outlen, &crc, 4);
c->used_size = outlen+4;
return 0;
}
static int decode_snappy(avro_codec_t c, void * data, int64_t len)
{
uint32_t crc;
size_t outlen;
if (snappy_uncompressed_length(data, len-4, &outlen) != SNAPPY_OK) {
avro_set_error("Uncompressed length error in snappy");
return 1;
}
if (!c->block_data) {
c->block_data = avro_malloc(outlen);
c->block_size = outlen;
} else if ( (size_t)c->block_size < outlen) {
c->block_data = avro_realloc(c->block_data, c->block_size, outlen);
c->block_size = outlen;
}
if (!c->block_data)
{
avro_set_error("Cannot allocate memory for snappy");
return 1;
}
if (snappy_uncompress(data, len-4, c->block_data, &outlen) != SNAPPY_OK)
{
avro_set_error("Error uncompressing block with Snappy");
return 1;
}
crc = __bswap_32(crc32(0, c->block_data, outlen));
if (memcmp(&crc, (char*)data+len-4, 4))
{
avro_set_error("CRC32 check failure uncompressing block with Snappy");
return 1;
}
c->used_size = outlen;
return 0;
}
static int reset_snappy(avro_codec_t c)
{
if (c->block_data) {
avro_free(c->block_data, c->block_size);
}
c->block_data = NULL;
c->block_size = 0;
c->used_size = 0;
c->codec_data = NULL;
return 0;
}
#endif // SNAPPY_CODEC
/* Deflate codec */
#ifdef DEFLATE_CODEC
struct codec_data_deflate {
z_stream deflate;
z_stream inflate;
};
#define codec_data_deflate_stream(cd) &((struct codec_data_deflate *)cd)->deflate
#define codec_data_inflate_stream(cd) &((struct codec_data_deflate *)cd)->inflate
static int
codec_deflate(avro_codec_t codec)
{
codec->name = "deflate";
codec->type = AVRO_CODEC_DEFLATE;
codec->block_size = 0;
codec->used_size = 0;
codec->block_data = NULL;
codec->codec_data = avro_new(struct codec_data_deflate);
if (!codec->codec_data) {
avro_set_error("Cannot allocate memory for zlib");
return 1;
}
z_stream *ds = codec_data_deflate_stream(codec->codec_data);
z_stream *is = codec_data_inflate_stream(codec->codec_data);
memset(ds, 0, sizeof(z_stream));
memset(is, 0, sizeof(z_stream));
ds->zalloc = is->zalloc = Z_NULL;
ds->zfree = is->zfree = Z_NULL;
ds->opaque = is->opaque = Z_NULL;
if (deflateInit2(ds, Z_BEST_COMPRESSION, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
avro_freet(struct codec_data_deflate, codec->codec_data);
codec->codec_data = NULL;
avro_set_error("Cannot initialize zlib deflate");
return 1;
}
if (inflateInit2(is, -15) != Z_OK) {
avro_freet(struct codec_data_deflate, codec->codec_data);
codec->codec_data = NULL;
avro_set_error("Cannot initialize zlib inflate");
return 1;
}
return 0;
}
static int encode_deflate(avro_codec_t c, void * data, int64_t len)
{
int err;
int64_t defl_len = compressBound((uLong)len * 1.2);
if (!c->block_data) {
c->block_data = avro_malloc(defl_len);
c->block_size = defl_len;
} else if ( c->block_size < defl_len) {
c->block_data = avro_realloc(c->block_data, c->block_size, defl_len);
c->block_size = defl_len;
}
if (!c->block_data)
{
avro_set_error("Cannot allocate memory for deflate");
return 1;
}
c->used_size = 0;
z_stream *s = codec_data_deflate_stream(c->codec_data);
s->next_in = (Bytef*)data;
s->avail_in = (uInt)len;
s->next_out = c->block_data;
s->avail_out = (uInt)c->block_size;
s->total_out = 0;
err = deflate(s, Z_FINISH);
if (err != Z_STREAM_END) {
deflateEnd(s);
if (err != Z_OK) {
avro_set_error("Error compressing block with deflate (%i)", err);
return 1;
}
return 0;
}
// zlib resizes the buffer?
c->block_size = s->total_out;
c->used_size = s->total_out;
if (deflateReset(s) != Z_OK) {
return 1;
}
return 0;
}
static int decode_deflate(avro_codec_t c, void * data, int64_t len)
{
int err;
z_stream *s = codec_data_inflate_stream(c->codec_data);
if (!c->block_data) {
c->block_data = avro_malloc(DEFAULT_BLOCK_SIZE);
c->block_size = DEFAULT_BLOCK_SIZE;
}
if (!c->block_data)
{
avro_set_error("Cannot allocate memory for deflate");
return 1;
}
c->used_size = 0;
s->next_in = data;
s->avail_in = len;
s->next_out = c->block_data;
s->avail_out = c->block_size;
s->total_out = 0;
do
{
err = inflate(s, Z_FINISH);
// Apparently if there is yet available space in the output then something
// has gone wrong in decompressing the data (according to cpython zlibmodule.c)
if (err == Z_BUF_ERROR && s->avail_out > 0) {
inflateEnd(s);
avro_set_error("Error decompressing block with deflate, possible data error");
return 1;
}
// The buffer was not big enough. resize it.
if (err == Z_BUF_ERROR)
{
c->block_data = avro_realloc(c->block_data, c->block_size, c->block_size * 2);
s->next_out = c->block_data + s->total_out;
s->avail_out += c->block_size;
c->block_size = c->block_size * 2;
}
} while (err == Z_BUF_ERROR);
if (err != Z_STREAM_END) {
inflateEnd(s);
if (err != Z_OK) {
avro_set_error("Error decompressing block with deflate (%i)", err);
return 1;
}
return 0;
}
c->used_size = s->total_out;
if (inflateReset(s) != Z_OK) {
avro_set_error("Error resetting deflate decompression");
return 1;
}
return 0;
}
static int reset_deflate(avro_codec_t c)
{
if (c->block_data) {
avro_free(c->block_data, c->block_size);
}
if (c->codec_data) {
deflateEnd(codec_data_deflate_stream(c->codec_data));
inflateEnd(codec_data_inflate_stream(c->codec_data));
avro_freet(struct codec_data_deflate, c->codec_data);
}
c->block_data = NULL;
c->block_size = 0;
c->used_size = 0;
c->codec_data = NULL;
return 0;
}
#endif // DEFLATE_CODEC
/* LZMA codec */
#ifdef LZMA_CODEC
struct codec_data_lzma {
lzma_filter filters[2];
lzma_options_lzma options;
};
#define codec_data_lzma_filters(cd) ((struct codec_data_lzma *)cd)->filters
#define codec_data_lzma_options(cd) &((struct codec_data_lzma *)cd)->options
static int
codec_lzma(avro_codec_t codec)
{
codec->name = "lzma";
codec->type = AVRO_CODEC_LZMA;
codec->block_size = 0;
codec->used_size = 0;
codec->block_data = NULL;
codec->codec_data = avro_new(struct codec_data_lzma);
if (!codec->codec_data) {
avro_set_error("Cannot allocate memory for lzma");
return 1;
}
lzma_options_lzma* opt = codec_data_lzma_options(codec->codec_data);
lzma_lzma_preset(opt, LZMA_PRESET_DEFAULT);
lzma_filter* filters = codec_data_lzma_filters(codec->codec_data);
filters[0].id = LZMA_FILTER_LZMA2;
filters[0].options = opt;
filters[1].id = LZMA_VLI_UNKNOWN;
filters[1].options = NULL;
return 0;
}
static int encode_lzma(avro_codec_t codec, void * data, int64_t len)
{
lzma_ret ret;
size_t written = 0;
lzma_filter* filters = codec_data_lzma_filters(codec->codec_data);
int64_t buff_len = len + lzma_raw_encoder_memusage(filters);
if (!codec->block_data) {
codec->block_data = avro_malloc(buff_len);
codec->block_size = buff_len;
}
if (!codec->block_data)
{
avro_set_error("Cannot allocate memory for lzma encoder");
return 1;
}
ret = lzma_raw_buffer_encode(filters, NULL, data, len, codec->block_data, &written, codec->block_size);
codec->used_size = written;
if (ret != LZMA_OK) {
avro_set_error("Error in lzma encoder");
return 1;
}
return 0;
}
static int decode_lzma(avro_codec_t codec, void * data, int64_t len)
{
size_t read_pos = 0;
size_t write_pos = 0;
lzma_ret ret;
lzma_filter* filters = codec_data_lzma_filters(codec->codec_data);
if (!codec->block_data) {
codec->block_data = avro_malloc(DEFAULT_BLOCK_SIZE);
codec->block_size = DEFAULT_BLOCK_SIZE;
}
if (!codec->block_data) {
avro_set_error("Cannot allocate memory for lzma decoder");
return 1;
}
do
{
ret = lzma_raw_buffer_decode(filters, NULL, data,
&read_pos, len, codec->block_data, &write_pos,
codec->block_size);
codec->used_size = write_pos;
// If it ran out of space to decode, give it more!!
// It will continue where it left off because of read_pos and write_pos.
if (ret == LZMA_BUF_ERROR) {
codec->block_data = avro_realloc(codec->block_data, codec->block_size, codec->block_size * 2);
codec->block_size = codec->block_size * 2;
}
} while (ret == LZMA_BUF_ERROR);
if (ret != LZMA_OK) {
avro_set_error("Error in lzma decoder");
return 1;
}
return 0;
}
static int reset_lzma(avro_codec_t c)
{
if (c->block_data) {
avro_free(c->block_data, c->block_size);
}
if (c->codec_data) {
avro_freet(struct codec_data_lzma, c->codec_data);
}
c->block_data = NULL;
c->block_size = 0;
c->used_size = 0;
c->codec_data = NULL;
return 0;
}
#endif // LZMA_CODEC
/* Common interface */
int avro_codec(avro_codec_t codec, const char *type)
{
if (type == NULL) {
return codec_null(codec);
}
#ifdef SNAPPY_CODEC
if (strcmp("snappy", type) == 0) {
return codec_snappy(codec);
}
#endif
#ifdef DEFLATE_CODEC
if (strcmp("deflate", type) == 0) {
return codec_deflate(codec);
}
#endif
#ifdef LZMA_CODEC
if (strcmp("lzma", type) == 0) {
return codec_lzma(codec);
}
#endif
if (strcmp("null", type) == 0) {
return codec_null(codec);
}
avro_set_error("Unknown codec %s", type);
return 1;
}
int avro_codec_encode(avro_codec_t c, void * data, int64_t len)
{
switch(c->type)
{
case AVRO_CODEC_NULL:
return encode_null(c, data, len);
#ifdef SNAPPY_CODEC
case AVRO_CODEC_SNAPPY:
return encode_snappy(c, data, len);
#endif
#ifdef DEFLATE_CODEC
case AVRO_CODEC_DEFLATE:
return encode_deflate(c, data, len);
#endif
#ifdef LZMA_CODEC
case AVRO_CODEC_LZMA:
return encode_lzma(c, data, len);
#endif
default:
return 1;
}
}
int avro_codec_decode(avro_codec_t c, void * data, int64_t len)
{
switch(c->type)
{
case AVRO_CODEC_NULL:
return decode_null(c, data, len);
#ifdef SNAPPY_CODEC
case AVRO_CODEC_SNAPPY:
return decode_snappy(c, data, len);
#endif
#ifdef DEFLATE_CODEC
case AVRO_CODEC_DEFLATE:
return decode_deflate(c, data, len);
#endif
#ifdef LZMA_CODEC
case AVRO_CODEC_LZMA:
return decode_lzma(c, data, len);
#endif
default:
return 1;
}
}
int avro_codec_reset(avro_codec_t c)
{
switch(c->type)
{
case AVRO_CODEC_NULL:
return reset_null(c);
#ifdef SNAPPY_CODEC
case AVRO_CODEC_SNAPPY:
return reset_snappy(c);
#endif
#ifdef DEFLATE_CODEC
case AVRO_CODEC_DEFLATE:
return reset_deflate(c);
#endif
#ifdef LZMA_CODEC
case AVRO_CODEC_LZMA:
return reset_lzma(c);
#endif
default:
return 1;
}
}