golang: support LZ4 and ZSTD compression algorithm (#958)
* support LZ4 and ZSTD compression algorithm
* update zstd
* update go mod
---------
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
diff --git a/golang/go.mod b/golang/go.mod
index 921328b..30efe3c 100644
--- a/golang/go.mod
+++ b/golang/go.mod
@@ -6,7 +6,9 @@
github.com/go-playground/validator/v10 v10.11.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
+ github.com/klauspost/compress v1.16.7
github.com/natefinch/lumberjack v2.0.0+incompatible
+ github.com/pierrec/lz4 v2.6.1+incompatible
github.com/prashantv/gostub v1.1.0
github.com/stretchr/testify v1.7.1
go.uber.org/zap v1.21.0
@@ -19,8 +21,9 @@
require (
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
+ github.com/frankban/quicktest v1.14.6 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
- github.com/google/go-cmp v0.5.8 // indirect
+ github.com/google/go-cmp v0.5.9 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
@@ -36,7 +39,7 @@
github.com/dchest/siphash v1.2.3
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
- github.com/golang/protobuf v1.5.2 // indirect
+ github.com/golang/protobuf v1.5.2
github.com/leodido/go-urn v1.2.1 // indirect
github.com/valyala/fastrand v1.1.0
go.opencensus.io v0.22.5
diff --git a/golang/go.sum b/golang/go.sum
index 2cad4a7..8e74118 100644
--- a/golang/go.sum
+++ b/golang/go.sum
@@ -32,6 +32,8 @@
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
+github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
@@ -72,8 +74,8 @@
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
-github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
+github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -87,10 +89,21 @@
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
+github.com/klauspost/compress v1.11.1 h1:bPb7nMRdOZYDrpPMTA3EInUQrdgoBinqUuSwlGdKDdE=
+github.com/klauspost/compress v1.11.1/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
+github.com/klauspost/compress v1.14.0 h1:PMLEuM4T/QQJPbP9IviImySv8Ehv/vc8pB+dBLUr+74=
+github.com/klauspost/compress v1.14.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
+github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4=
+github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
+github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
+github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
+github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
+github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
-github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
+github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
+github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@@ -99,6 +112,8 @@
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4UpgHp07nNdFX7mqFfM=
github.com/natefinch/lumberjack v2.0.0+incompatible/go.mod h1:Wi9p2TTF5DG5oU+6YfsmYQpsTIOm0B1VNzQg9Mw6nPk=
+github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
+github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -111,8 +126,9 @@
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
-github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
+github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
+github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
diff --git a/golang/pkg/utils/utils.go b/golang/pkg/utils/utils.go
index a8343ef..3af86c6 100644
--- a/golang/pkg/utils/utils.go
+++ b/golang/pkg/utils/utils.go
@@ -36,11 +36,23 @@
"github.com/apache/rocketmq-clients/golang/v5/metadata"
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
+ "github.com/klauspost/compress/zstd"
+ "github.com/pierrec/lz4"
"github.com/valyala/fastrand"
"go.opencensus.io/trace"
MD "google.golang.org/grpc/metadata"
)
+type CompressionType int32
+
+const (
+ Unknown CompressionType = 0
+ GZIP CompressionType = 1
+ Zlib CompressionType = 2
+ LZ4 CompressionType = 3
+ ZSTD CompressionType = 4
+)
+
func Mod(n int32, m int) int {
if int32(m) <= 0 {
return 0
@@ -141,14 +153,40 @@
return false
}
+func MatchCompressionAlgorithm(in []byte) CompressionType {
+ if in == nil {
+ return Unknown
+ }
+ if len(in) >= 2 {
+ if in[0] == 0x78 {
+ return Zlib
+ } else if in[0] == 0x1f && in[1] == 0x8b {
+ return GZIP
+ }
+ }
+ if len(in) >= 4 {
+ if in[0] == 0x04 && in[1] == 0x22 && in[2] == 0x4D && in[3] == 0x18 {
+ return LZ4
+ } else if in[0] == 0x28 && in[1] == 0xB5 && in[2] == 0x2F && in[3] == 0xFD {
+ return ZSTD
+ }
+ }
+ return Unknown
+}
+
func AutoDecode(in []byte) ([]byte, error) {
- if len(in) < 2 {
- return in, fmt.Errorf("unknown format")
- }
- if in[0] == 0x1f && in[1] == 0x8b {
+ compressionType := MatchCompressionAlgorithm(in)
+ switch compressionType {
+ case Zlib:
+ return ZlibDecode(in)
+ case GZIP:
return GZIPDecode(in)
+ case LZ4:
+ return Lz4Decode(in)
+ case ZSTD:
+ return ZstdDecode(in)
}
- return ZlibDecode(in)
+ return in, fmt.Errorf("unknown format")
}
func ZlibDecode(in []byte) ([]byte, error) {
@@ -161,6 +199,21 @@
return ioutil.ReadAll(reader)
}
+func Lz4Decode(in []byte) ([]byte, error) {
+ reader := lz4.NewReader(bytes.NewReader(in))
+ return ioutil.ReadAll(reader)
+}
+
+func ZstdDecode(in []byte) ([]byte, error) {
+ reader, err := zstd.NewReader(bytes.NewReader(in))
+ if err != nil {
+ var out []byte
+ return out, err
+ }
+ defer reader.Close()
+ return ioutil.ReadAll(reader)
+}
+
func GZIPDecode(in []byte) ([]byte, error) {
reader, err := gzip.NewReader(bytes.NewReader(in))
if err != nil {
diff --git a/golang/pkg/utils/utils_test.go b/golang/pkg/utils/utils_test.go
index 514f374..60d6a36 100644
--- a/golang/pkg/utils/utils_test.go
+++ b/golang/pkg/utils/utils_test.go
@@ -114,10 +114,10 @@
func TestAutoDecode(t *testing.T) {
_, err := AutoDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
- if err != zlib.ErrHeader {
+ if err == nil {
t.Error()
}
- _, err = AutoDecode([]byte{0})
+ _, err = AutoDecode([]byte{78, 0})
if err == nil {
t.Error()
}
@@ -137,6 +137,22 @@
if string(bytes) != "rocketmq-client-go" {
t.Error()
}
+ // lz4
+ bytes, err = AutoDecode([]byte{4, 34, 77, 24, 100, 112, 185, 18, 0, 0, 128, 114, 111, 99, 107, 101, 116, 109, 113, 45, 99, 108, 105, 101, 110, 116, 45, 103, 111, 0, 0, 0, 0, 248, 183, 23, 47})
+ if err != nil {
+ t.Error()
+ }
+ if string(bytes) != "rocketmq-client-go" {
+ t.Error()
+ }
+ // zstd
+ bytes, err = AutoDecode([]byte{40, 181, 47, 253, 32, 18, 145, 0, 0, 114, 111, 99, 107, 101, 116, 109, 113, 45, 99, 108, 105, 101, 110, 116, 45, 103, 111})
+ if err != nil {
+ t.Error()
+ }
+ if string(bytes) != "rocketmq-client-go" {
+ t.Error()
+ }
}
func TestGZIPDecode(t *testing.T) {
@@ -167,6 +183,34 @@
}
}
+func TestLz4Decode(t *testing.T) {
+ _, err := Lz4Decode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
+ if err == nil {
+ t.Error()
+ }
+ bytes, err := Lz4Decode([]byte{4, 34, 77, 24, 100, 112, 185, 18, 0, 0, 128, 114, 111, 99, 107, 101, 116, 109, 113, 45, 99, 108, 105, 101, 110, 116, 45, 103, 111, 0, 0, 0, 0, 248, 183, 23, 47})
+ if err != nil {
+ t.Error()
+ }
+ if string(bytes) != "rocketmq-client-go" {
+ t.Error()
+ }
+}
+
+func TestZstdDecode(t *testing.T) {
+ _, err := ZstdDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
+ if err == nil {
+ t.Error()
+ }
+ bytes, err := ZstdDecode([]byte{40, 181, 47, 253, 32, 18, 145, 0, 0, 114, 111, 99, 107, 101, 116, 109, 113, 45, 99, 108, 105, 101, 110, 116, 45, 103, 111})
+ if err != nil {
+ t.Error()
+ }
+ if string(bytes) != "rocketmq-client-go" {
+ t.Error()
+ }
+}
+
func TestSelectAnAddress(t *testing.T) {
if SelectAnAddress(nil) != nil {
t.Error()