golang: support LZ4 and ZSTD compression algorithm (#958)

* support LZ4 and ZSTD compression algorithm

* update zstd

* update go mod

---------

Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
diff --git a/golang/go.mod b/golang/go.mod
index 921328b..30efe3c 100644
--- a/golang/go.mod
+++ b/golang/go.mod
@@ -6,7 +6,9 @@
 	github.com/go-playground/validator/v10 v10.11.0
 	github.com/golang/mock v1.6.0
 	github.com/google/uuid v1.3.0
+	github.com/klauspost/compress v1.16.7
 	github.com/natefinch/lumberjack v2.0.0+incompatible
+	github.com/pierrec/lz4 v2.6.1+incompatible
 	github.com/prashantv/gostub v1.1.0
 	github.com/stretchr/testify v1.7.1
 	go.uber.org/zap v1.21.0
@@ -19,8 +21,9 @@
 require (
 	github.com/benbjohnson/clock v1.3.0 // indirect
 	github.com/davecgh/go-spew v1.1.1 // indirect
+	github.com/frankban/quicktest v1.14.6 // indirect
 	github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
-	github.com/google/go-cmp v0.5.8 // indirect
+	github.com/google/go-cmp v0.5.9 // indirect
 	github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
 	github.com/pkg/errors v0.9.1 // indirect
 	github.com/pmezard/go-difflib v1.0.0 // indirect
@@ -36,7 +39,7 @@
 	github.com/dchest/siphash v1.2.3
 	github.com/go-playground/locales v0.14.0 // indirect
 	github.com/go-playground/universal-translator v0.18.0 // indirect
-	github.com/golang/protobuf v1.5.2 // indirect
+	github.com/golang/protobuf v1.5.2
 	github.com/leodido/go-urn v1.2.1 // indirect
 	github.com/valyala/fastrand v1.1.0
 	go.opencensus.io v0.22.5
diff --git a/golang/go.sum b/golang/go.sum
index 2cad4a7..8e74118 100644
--- a/golang/go.sum
+++ b/golang/go.sum
@@ -32,6 +32,8 @@
 github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
 github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
 github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
+github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
 github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
 github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
 github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
@@ -72,8 +74,8 @@
 github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
-github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
+github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
 github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
 github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -87,10 +89,21 @@
 github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
 github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
 github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
+github.com/klauspost/compress v1.11.1 h1:bPb7nMRdOZYDrpPMTA3EInUQrdgoBinqUuSwlGdKDdE=
+github.com/klauspost/compress v1.11.1/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
+github.com/klauspost/compress v1.14.0 h1:PMLEuM4T/QQJPbP9IviImySv8Ehv/vc8pB+dBLUr+74=
+github.com/klauspost/compress v1.14.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
+github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4=
+github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
+github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
+github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
+github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
+github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
 github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
-github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
 github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
+github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
+github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@@ -99,6 +112,8 @@
 github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
 github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4UpgHp07nNdFX7mqFfM=
 github.com/natefinch/lumberjack v2.0.0+incompatible/go.mod h1:Wi9p2TTF5DG5oU+6YfsmYQpsTIOm0B1VNzQg9Mw6nPk=
+github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
+github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
 github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -111,8 +126,9 @@
 github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
 github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
 github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
-github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8=
 github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
+github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
+github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
diff --git a/golang/pkg/utils/utils.go b/golang/pkg/utils/utils.go
index a8343ef..3af86c6 100644
--- a/golang/pkg/utils/utils.go
+++ b/golang/pkg/utils/utils.go
@@ -36,11 +36,23 @@
 
 	"github.com/apache/rocketmq-clients/golang/v5/metadata"
 	v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
+	"github.com/klauspost/compress/zstd"
+	"github.com/pierrec/lz4"
 	"github.com/valyala/fastrand"
 	"go.opencensus.io/trace"
 	MD "google.golang.org/grpc/metadata"
 )
 
+type CompressionType int32
+
+const (
+	Unknown CompressionType = 0
+	GZIP    CompressionType = 1
+	Zlib    CompressionType = 2
+	LZ4     CompressionType = 3
+	ZSTD    CompressionType = 4
+)
+
 func Mod(n int32, m int) int {
 	if int32(m) <= 0 {
 		return 0
@@ -141,14 +153,40 @@
 	return false
 }
 
+func MatchCompressionAlgorithm(in []byte) CompressionType {
+	if in == nil {
+		return Unknown
+	}
+	if len(in) >= 2 {
+		if in[0] == 0x78 {
+			return Zlib
+		} else if in[0] == 0x1f && in[1] == 0x8b {
+			return GZIP
+		}
+	}
+	if len(in) >= 4 {
+		if in[0] == 0x04 && in[1] == 0x22 && in[2] == 0x4D && in[3] == 0x18 {
+			return LZ4
+		} else if in[0] == 0x28 && in[1] == 0xB5 && in[2] == 0x2F && in[3] == 0xFD {
+			return ZSTD
+		}
+	}
+	return Unknown
+}
+
 func AutoDecode(in []byte) ([]byte, error) {
-	if len(in) < 2 {
-		return in, fmt.Errorf("unknown format")
-	}
-	if in[0] == 0x1f && in[1] == 0x8b {
+	compressionType := MatchCompressionAlgorithm(in)
+	switch compressionType {
+	case Zlib:
+		return ZlibDecode(in)
+	case GZIP:
 		return GZIPDecode(in)
+	case LZ4:
+		return Lz4Decode(in)
+	case ZSTD:
+		return ZstdDecode(in)
 	}
-	return ZlibDecode(in)
+	return in, fmt.Errorf("unknown format")
 }
 
 func ZlibDecode(in []byte) ([]byte, error) {
@@ -161,6 +199,21 @@
 	return ioutil.ReadAll(reader)
 }
 
+func Lz4Decode(in []byte) ([]byte, error) {
+	reader := lz4.NewReader(bytes.NewReader(in))
+	return ioutil.ReadAll(reader)
+}
+
+func ZstdDecode(in []byte) ([]byte, error) {
+	reader, err := zstd.NewReader(bytes.NewReader(in))
+	if err != nil {
+		var out []byte
+		return out, err
+	}
+	defer reader.Close()
+	return ioutil.ReadAll(reader)
+}
+
 func GZIPDecode(in []byte) ([]byte, error) {
 	reader, err := gzip.NewReader(bytes.NewReader(in))
 	if err != nil {
diff --git a/golang/pkg/utils/utils_test.go b/golang/pkg/utils/utils_test.go
index 514f374..60d6a36 100644
--- a/golang/pkg/utils/utils_test.go
+++ b/golang/pkg/utils/utils_test.go
@@ -114,10 +114,10 @@
 
 func TestAutoDecode(t *testing.T) {
 	_, err := AutoDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
-	if err != zlib.ErrHeader {
+	if err == nil {
 		t.Error()
 	}
-	_, err = AutoDecode([]byte{0})
+	_, err = AutoDecode([]byte{78, 0})
 	if err == nil {
 		t.Error()
 	}
@@ -137,6 +137,22 @@
 	if string(bytes) != "rocketmq-client-go" {
 		t.Error()
 	}
+	// lz4
+	bytes, err = AutoDecode([]byte{4, 34, 77, 24, 100, 112, 185, 18, 0, 0, 128, 114, 111, 99, 107, 101, 116, 109, 113, 45, 99, 108, 105, 101, 110, 116, 45, 103, 111, 0, 0, 0, 0, 248, 183, 23, 47})
+	if err != nil {
+		t.Error()
+	}
+	if string(bytes) != "rocketmq-client-go" {
+		t.Error()
+	}
+	// zstd
+	bytes, err = AutoDecode([]byte{40, 181, 47, 253, 32, 18, 145, 0, 0, 114, 111, 99, 107, 101, 116, 109, 113, 45, 99, 108, 105, 101, 110, 116, 45, 103, 111})
+	if err != nil {
+		t.Error()
+	}
+	if string(bytes) != "rocketmq-client-go" {
+		t.Error()
+	}
 }
 
 func TestGZIPDecode(t *testing.T) {
@@ -167,6 +183,34 @@
 	}
 }
 
+func TestLz4Decode(t *testing.T) {
+	_, err := Lz4Decode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
+	if err == nil {
+		t.Error()
+	}
+	bytes, err := Lz4Decode([]byte{4, 34, 77, 24, 100, 112, 185, 18, 0, 0, 128, 114, 111, 99, 107, 101, 116, 109, 113, 45, 99, 108, 105, 101, 110, 116, 45, 103, 111, 0, 0, 0, 0, 248, 183, 23, 47})
+	if err != nil {
+		t.Error()
+	}
+	if string(bytes) != "rocketmq-client-go" {
+		t.Error()
+	}
+}
+
+func TestZstdDecode(t *testing.T) {
+	_, err := ZstdDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
+	if err == nil {
+		t.Error()
+	}
+	bytes, err := ZstdDecode([]byte{40, 181, 47, 253, 32, 18, 145, 0, 0, 114, 111, 99, 107, 101, 116, 109, 113, 45, 99, 108, 105, 101, 110, 116, 45, 103, 111})
+	if err != nil {
+		t.Error()
+	}
+	if string(bytes) != "rocketmq-client-go" {
+		t.Error()
+	}
+}
+
 func TestSelectAnAddress(t *testing.T) {
 	if SelectAnAddress(nil) != nil {
 		t.Error()