blob: 1b6d83687c204b604f2b9218e4240e7988c5aa28 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package compress contains the interfaces and implementations for handling compression/decompression
// of parquet data at the column levels.
package compress
import (
"compress/flate"
"io"
"io/ioutil"
"github.com/apache/arrow/go/parquet/internal/gen-go/parquet"
"golang.org/x/xerrors"
)
// Compression is an alias to the thrift compression codec enum type for easy use
type Compression parquet.CompressionCodec
func (c Compression) String() string {
return parquet.CompressionCodec(c).String()
}
// DefaultCompressionLevel will use flate.DefaultCompression since many of the compression libraries
// use that to denote "use the default".
const DefaultCompressionLevel = flate.DefaultCompression
// Codecs is a useful struct to provide namespaced enum values to use for specifying the compression type to use
// which make for easy internal swapping between them and the thrift enum since they are initialized to the same
// constant values.
var Codecs = struct {
Uncompressed Compression
Snappy Compression
Gzip Compression
// LZO is unsupported in this library since LZO license is incompatible with Apache License
Lzo Compression
Brotli Compression
// LZ4 unsupported in this library due to problematic issues between the Hadoop LZ4 spec vs regular lz4
// see: http://mail-archives.apache.org/mod_mbox/arrow-dev/202007.mbox/%3CCAAri41v24xuA8MGHLDvgSnE+7AAgOhiEukemW_oPNHMvfMmrWw@mail.gmail.com%3E
Lz4 Compression
Zstd Compression
}{
Uncompressed: Compression(parquet.CompressionCodec_UNCOMPRESSED),
Snappy: Compression(parquet.CompressionCodec_SNAPPY),
Gzip: Compression(parquet.CompressionCodec_GZIP),
Lzo: Compression(parquet.CompressionCodec_LZO),
Brotli: Compression(parquet.CompressionCodec_BROTLI),
Lz4: Compression(parquet.CompressionCodec_LZ4),
Zstd: Compression(parquet.CompressionCodec_ZSTD),
}
// Codec is an interface which is implemented for each compression type in order to make the interactions easy to
// implement. Most consumers won't be calling GetCodec directly.
type Codec interface {
// NewReader provides a reader that wraps a stream with compressed data to stream the uncompressed data
NewReader(io.Reader) io.ReadCloser
// NewWriter provides a wrapper around a write stream to compress data before writing it.
NewWriter(io.Writer) io.WriteCloser
// NewWriterLevel is like NewWriter but allows specifying the compression level
NewWriterLevel(io.Writer, int) (io.WriteCloser, error)
// Encode encodes a block of data given by src and returns the compressed block. dst should be either nil
// or sized large enough to fit the compressed block (use CompressBound to allocate). dst and src should not
// overlap since some of the compression types don't allow it.
//
// The returned slice will be one of the following:
// 1. If dst was nil or dst was too small to fit the compressed data, it will be a newly allocated slice
// 2. If dst was large enough to fit the compressed data (depending on the compression algorithm it might
// be required to be at least CompressBound length) then it might be a slice of dst.
Encode(dst, src []byte) []byte
// EncodeLevel is like Encode, but specifies a particular encoding level instead of the default.
EncodeLevel(dst, src []byte, level int) []byte
// CompressBound returns the boundary of maximum size of compressed data under the chosen codec.
CompressBound(int64) int64
// Decode is for decoding a single block rather than a stream, like with Encode, dst must be either nil or
// sized large enough to accommodate the uncompressed data and should not overlap with src.
//
// the returned slice *might* be a slice of dst.
Decode(dst, src []byte) []byte
}
var codecs = map[Compression]Codec{}
type nocodec struct{}
func (nocodec) NewReader(r io.Reader) io.ReadCloser {
ret, ok := r.(io.ReadCloser)
if !ok {
return ioutil.NopCloser(r)
}
return ret
}
func (nocodec) Decode(dst, src []byte) []byte {
if dst != nil {
copy(dst, src)
}
return dst
}
type writerNopCloser struct {
io.Writer
}
func (writerNopCloser) Close() error {
return nil
}
func (nocodec) Encode(dst, src []byte) []byte {
copy(dst, src)
return dst
}
func (nocodec) EncodeLevel(dst, src []byte, _ int) []byte {
copy(dst, src)
return dst
}
func (nocodec) NewWriter(w io.Writer) io.WriteCloser {
ret, ok := w.(io.WriteCloser)
if !ok {
return writerNopCloser{w}
}
return ret
}
func (n nocodec) NewWriterLevel(w io.Writer, _ int) (io.WriteCloser, error) {
return n.NewWriter(w), nil
}
func (nocodec) CompressBound(len int64) int64 { return len }
func init() {
codecs[Codecs.Uncompressed] = nocodec{}
}
// GetCodec returns a Codec interface for the requested Compression type
func GetCodec(typ Compression) (Codec, error) {
ret, ok := codecs[typ]
if !ok {
return nil, xerrors.Errorf("compression for %s unimplemented", typ.String())
}
return ret, nil
}