| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // Package compress contains the interfaces and implementations for handling compression/decompression |
| // of parquet data at the column levels. |
| package compress |
| |
| import ( |
| "compress/flate" |
| "io" |
| "io/ioutil" |
| |
| "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" |
| "golang.org/x/xerrors" |
| ) |
| |
| // Compression is an alias to the thrift compression codec enum type for easy use |
| type Compression parquet.CompressionCodec |
| |
| func (c Compression) String() string { |
| return parquet.CompressionCodec(c).String() |
| } |
| |
| // DefaultCompressionLevel will use flate.DefaultCompression since many of the compression libraries |
| // use that to denote "use the default". |
| const DefaultCompressionLevel = flate.DefaultCompression |
| |
| // Codecs is a useful struct to provide namespaced enum values to use for specifying the compression type to use |
| // which make for easy internal swapping between them and the thrift enum since they are initialized to the same |
| // constant values. |
| var Codecs = struct { |
| Uncompressed Compression |
| Snappy Compression |
| Gzip Compression |
| // LZO is unsupported in this library since LZO license is incompatible with Apache License |
| Lzo Compression |
| Brotli Compression |
| // LZ4 unsupported in this library due to problematic issues between the Hadoop LZ4 spec vs regular lz4 |
| // see: http://mail-archives.apache.org/mod_mbox/arrow-dev/202007.mbox/%3CCAAri41v24xuA8MGHLDvgSnE+7AAgOhiEukemW_oPNHMvfMmrWw@mail.gmail.com%3E |
| Lz4 Compression |
| Zstd Compression |
| }{ |
| Uncompressed: Compression(parquet.CompressionCodec_UNCOMPRESSED), |
| Snappy: Compression(parquet.CompressionCodec_SNAPPY), |
| Gzip: Compression(parquet.CompressionCodec_GZIP), |
| Lzo: Compression(parquet.CompressionCodec_LZO), |
| Brotli: Compression(parquet.CompressionCodec_BROTLI), |
| Lz4: Compression(parquet.CompressionCodec_LZ4), |
| Zstd: Compression(parquet.CompressionCodec_ZSTD), |
| } |
| |
| // Codec is an interface which is implemented for each compression type in order to make the interactions easy to |
| // implement. Most consumers won't be calling GetCodec directly. |
| type Codec interface { |
| // NewReader provides a reader that wraps a stream with compressed data to stream the uncompressed data |
| NewReader(io.Reader) io.ReadCloser |
| // NewWriter provides a wrapper around a write stream to compress data before writing it. |
| NewWriter(io.Writer) io.WriteCloser |
| // NewWriterLevel is like NewWriter but allows specifying the compression level |
| NewWriterLevel(io.Writer, int) (io.WriteCloser, error) |
| // Encode encodes a block of data given by src and returns the compressed block. dst should be either nil |
| // or sized large enough to fit the compressed block (use CompressBound to allocate). dst and src should not |
| // overlap since some of the compression types don't allow it. |
| // |
| // The returned slice will be one of the following: |
| // 1. If dst was nil or dst was too small to fit the compressed data, it will be a newly allocated slice |
| // 2. If dst was large enough to fit the compressed data (depending on the compression algorithm it might |
| // be required to be at least CompressBound length) then it might be a slice of dst. |
| Encode(dst, src []byte) []byte |
| // EncodeLevel is like Encode, but specifies a particular encoding level instead of the default. |
| EncodeLevel(dst, src []byte, level int) []byte |
| // CompressBound returns the boundary of maximum size of compressed data under the chosen codec. |
| CompressBound(int64) int64 |
| // Decode is for decoding a single block rather than a stream, like with Encode, dst must be either nil or |
| // sized large enough to accommodate the uncompressed data and should not overlap with src. |
| // |
| // the returned slice *might* be a slice of dst. |
| Decode(dst, src []byte) []byte |
| } |
| |
| var codecs = map[Compression]Codec{} |
| |
| type nocodec struct{} |
| |
| func (nocodec) NewReader(r io.Reader) io.ReadCloser { |
| ret, ok := r.(io.ReadCloser) |
| if !ok { |
| return ioutil.NopCloser(r) |
| } |
| return ret |
| } |
| |
| func (nocodec) Decode(dst, src []byte) []byte { |
| if dst != nil { |
| copy(dst, src) |
| } |
| return dst |
| } |
| |
| type writerNopCloser struct { |
| io.Writer |
| } |
| |
| func (writerNopCloser) Close() error { |
| return nil |
| } |
| |
| func (nocodec) Encode(dst, src []byte) []byte { |
| copy(dst, src) |
| return dst |
| } |
| |
| func (nocodec) EncodeLevel(dst, src []byte, _ int) []byte { |
| copy(dst, src) |
| return dst |
| } |
| |
| func (nocodec) NewWriter(w io.Writer) io.WriteCloser { |
| ret, ok := w.(io.WriteCloser) |
| if !ok { |
| return writerNopCloser{w} |
| } |
| return ret |
| } |
| |
| func (n nocodec) NewWriterLevel(w io.Writer, _ int) (io.WriteCloser, error) { |
| return n.NewWriter(w), nil |
| } |
| |
| func (nocodec) CompressBound(len int64) int64 { return len } |
| |
| func init() { |
| codecs[Codecs.Uncompressed] = nocodec{} |
| } |
| |
| // GetCodec returns a Codec interface for the requested Compression type |
| func GetCodec(typ Compression) (Codec, error) { |
| ret, ok := codecs[typ] |
| if !ok { |
| return nil, xerrors.Errorf("compression for %s unimplemented", typ.String()) |
| } |
| return ret, nil |
| } |