feature: add support for segmentio-kafka (#176)

diff --git a/.github/workflows/plugin-tests.yaml b/.github/workflows/plugin-tests.yaml
index 737184d..c58c498 100644
--- a/.github/workflows/plugin-tests.yaml
+++ b/.github/workflows/plugin-tests.yaml
@@ -99,6 +99,7 @@
           - rocketmq
           - amqp
           - pulsar
+          - segmentio-kafka
     steps:
       - uses: actions/checkout@v2
         with:
diff --git a/CHANGES.md b/CHANGES.md
index 467c25f..f823c49 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -6,6 +6,7 @@
 ------------------
 #### Plugins
 * Support [Pulsar](https://github.com/apache/pulsar-client-go) MQ.
+* Support [Segmentio-Kafka](https://github.com/segmentio/kafka-go) MQ.
 
 0.4.0
 ------------------
diff --git a/docs/en/agent/support-plugins.md b/docs/en/agent/support-plugins.md
index 92502f1..3eb8dbc 100644
--- a/docs/en/agent/support-plugins.md
+++ b/docs/en/agent/support-plugins.md
@@ -31,6 +31,7 @@
   * `rocketMQ`: [rocketmq-client-go](https://github.com/apache/rocketmq-client-go) tested v2.1.2.
   * `amqp`: [AMQP](https://github.com/rabbitmq/amqp091-go) tested v1.9.0.
   * `pulsar`: [pulsar-client-go](https://github.com/apache/pulsar-client-go) tested v0.12.0.
+  * `segmentio-kafka`: [segmentio-kafka](https://github.com/segmentio/kafka-go) tested v0.4.47.
 
 # Metrics Plugins
 The meter plugin provides the advanced metrics collections.
diff --git a/go.work b/go.work
index fd0ce8b..29fd8a3 100644
--- a/go.work
+++ b/go.work
@@ -26,6 +26,7 @@
 	./plugins/rocketmq
 	./plugins/amqp
 	./plugins/pulsar
+	./plugins/segmentio-kafka
 
 	./test/benchmark-codebase/consumer
 	./test/benchmark-codebase/provider
@@ -58,6 +59,7 @@
 	./test/plugins/scenarios/rocketmq
 	./test/plugins/scenarios/amqp
 	./test/plugins/scenarios/pulsar
+	./test/plugins/scenarios/segmentio-kafka
 
 	./tools/go-agent
 
diff --git a/plugins/segmentio-kafka/go.mod b/plugins/segmentio-kafka/go.mod
new file mode 100644
index 0000000..cd8cffb
--- /dev/null
+++ b/plugins/segmentio-kafka/go.mod
@@ -0,0 +1,9 @@
+module github.com/apache/skywalking-go/plugins/segmentio-kafka
+
+go 1.18
+
+require (
+	github.com/klauspost/compress v1.15.9 // indirect
+	github.com/pierrec/lz4/v4 v4.1.15 // indirect
+	github.com/segmentio/kafka-go v0.4.47 // indirect
+)
diff --git a/plugins/segmentio-kafka/go.sum b/plugins/segmentio-kafka/go.sum
new file mode 100644
index 0000000..cf392a8
--- /dev/null
+++ b/plugins/segmentio-kafka/go.sum
@@ -0,0 +1,59 @@
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
+github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
+github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
+github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
+github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
+github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
+github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
+golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
+golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
+golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
+golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
+golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
+golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
+golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/plugins/segmentio-kafka/instrument.go b/plugins/segmentio-kafka/instrument.go
new file mode 100644
index 0000000..a7ff008
--- /dev/null
+++ b/plugins/segmentio-kafka/instrument.go
@@ -0,0 +1,78 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package segmentiokafka
+
+import (
+	"embed"
+
+	"github.com/apache/skywalking-go/plugins/core/instrument"
+)
+
+//go:embed *
+var fs embed.FS
+
+//skywalking:nocopy
+type Instrument struct {
+}
+
+func NewInstrument() *Instrument {
+	return &Instrument{}
+}
+
+func (i *Instrument) Name() string {
+	return "segmentio_kafka"
+}
+
+func (i *Instrument) BasePackage() string {
+	return "github.com/segmentio/kafka-go"
+}
+
+func (i *Instrument) VersionChecker(version string) bool {
+	return true
+}
+
+func (i *Instrument) Points() []*instrument.Point {
+	return []*instrument.Point{
+		{
+			PackageName: "kafka",
+			At: instrument.NewMethodEnhance("*Writer", "WriteMessages",
+				instrument.WithArgsCount(2),
+				instrument.WithArgType(0, "context.Context"),
+				instrument.WithArgType(1, "...Message"),
+				instrument.WithResultCount(1),
+				instrument.WithResultType(0, "error"),
+			),
+			Interceptor: "WriterInterceptor",
+		},
+		{
+			PackageName: "kafka",
+			At: instrument.NewMethodEnhance("*Reader", "ReadMessage",
+				instrument.WithArgsCount(1),
+				instrument.WithArgType(0, "context.Context"),
+				instrument.WithResultCount(2),
+				instrument.WithResultType(0, "Message"),
+				instrument.WithResultType(1, "error"),
+			),
+			Interceptor: "ReaderInterceptor",
+		},
+	}
+}
+
+func (i *Instrument) FS() *embed.FS {
+	return &fs
+}
diff --git a/plugins/segmentio-kafka/reader_interceptor.go b/plugins/segmentio-kafka/reader_interceptor.go
new file mode 100644
index 0000000..4a876b4
--- /dev/null
+++ b/plugins/segmentio-kafka/reader_interceptor.go
@@ -0,0 +1,74 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package segmentiokafka
+
+import (
+	"strings"
+
+	"github.com/segmentio/kafka-go"
+
+	"github.com/apache/skywalking-go/plugins/core/operator"
+	"github.com/apache/skywalking-go/plugins/core/tracing"
+)
+
+const (
+	kafkaReaderPrefix      = "Kafka/"
+	kafkaReaderSuffix      = "/Consumer"
+	kafkaReaderComponentID = 41
+	semicolon              = ";"
+)
+
+type ReaderInterceptor struct {
+}
+
+func (r *ReaderInterceptor) BeforeInvoke(invocation operator.Invocation) error {
+	return nil
+}
+
+func (r *ReaderInterceptor) AfterInvoke(invocation operator.Invocation, result ...interface{}) error {
+	reader := invocation.CallerInstance().(*kafka.Reader)
+	brokers := strings.Join(reader.Config().Brokers, semicolon)
+	message := result[0].(kafka.Message)
+	topic := message.Topic
+	operationName := kafkaReaderPrefix + topic + kafkaReaderSuffix
+
+	span, err := tracing.CreateEntrySpan(operationName, func(headerKey string) (string, error) {
+		for _, header := range message.Headers {
+			if header.Key == headerKey {
+				return string(header.Value), nil
+			}
+		}
+		return "", nil
+	},
+		tracing.WithLayer(tracing.SpanLayerMQ),
+		tracing.WithComponent(kafkaReaderComponentID),
+		tracing.WithTag(tracing.TagMQBroker, brokers),
+		tracing.WithTag(tracing.TagMQTopic, topic),
+	)
+	if err != nil {
+		return err
+	}
+
+	if err, ok := result[1].(error); ok {
+		span.Tag(tracing.TagMQStatus, err.Error())
+		span.Error(err.Error())
+	}
+	span.SetPeer(brokers)
+	span.End()
+	return nil
+}
diff --git a/plugins/segmentio-kafka/writer_interceptor.go b/plugins/segmentio-kafka/writer_interceptor.go
new file mode 100644
index 0000000..78d93d9
--- /dev/null
+++ b/plugins/segmentio-kafka/writer_interceptor.go
@@ -0,0 +1,80 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package segmentiokafka
+
+import (
+	"github.com/segmentio/kafka-go"
+
+	"github.com/apache/skywalking-go/plugins/core/operator"
+	"github.com/apache/skywalking-go/plugins/core/tracing"
+)
+
+const (
+	kafkaWriterPrefix      = "Kafka/"
+	kafkaWriterSuffix      = "/Producer"
+	kafkaWriterComponentID = 40
+)
+
+type WriterInterceptor struct {
+}
+
+func (w *WriterInterceptor) BeforeInvoke(invocation operator.Invocation) error {
+	writer := invocation.CallerInstance().(*kafka.Writer)
+	addr, topic := writer.Addr.String(), writer.Topic
+	messageList := invocation.Args()[1].([]kafka.Message)
+	operationName := kafkaWriterPrefix + topic + kafkaWriterSuffix
+
+	span, err := tracing.CreateExitSpan(operationName, addr, func(headerKey, headerValue string) error {
+		for idx := range messageList {
+			if len(messageList[idx].Headers) == 0 {
+				messageList[idx].Headers = []kafka.Header{
+					{Key: headerKey, Value: []byte(headerValue)},
+				}
+			} else {
+				messageList[idx].Headers = append(messageList[idx].Headers,
+					kafka.Header{Key: headerKey, Value: []byte(headerValue)})
+			}
+		}
+		return nil
+	},
+		tracing.WithLayer(tracing.SpanLayerMQ),
+		tracing.WithComponent(kafkaWriterComponentID),
+		tracing.WithTag(tracing.TagMQBroker, addr),
+		tracing.WithTag(tracing.TagMQTopic, topic),
+	)
+	if err != nil {
+		return err
+	}
+
+	span.SetPeer(addr)
+	invocation.SetContext(span)
+	return nil
+}
+
+func (w *WriterInterceptor) AfterInvoke(invocation operator.Invocation, result ...interface{}) error {
+	if invocation.GetContext() == nil {
+		return nil
+	}
+	span := invocation.GetContext().(tracing.Span)
+	if err, ok := result[0].(error); ok && err != nil {
+		span.Tag(tracing.TagMQStatus, err.Error())
+		span.Error(err.Error())
+	}
+	span.End()
+	return nil
+}
diff --git a/test/plugins/scenarios/segmentio-kafka/bin/startup.sh b/test/plugins/scenarios/segmentio-kafka/bin/startup.sh
new file mode 100755
index 0000000..e758869
--- /dev/null
+++ b/test/plugins/scenarios/segmentio-kafka/bin/startup.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+home="$(cd "$(dirname $0)"; pwd)"
+go build ${GO_BUILD_OPTS} -o kafka
+
+./kafka
\ No newline at end of file
diff --git a/test/plugins/scenarios/segmentio-kafka/config/excepted.yml b/test/plugins/scenarios/segmentio-kafka/config/excepted.yml
new file mode 100644
index 0000000..bb4f7b7
--- /dev/null
+++ b/test/plugins/scenarios/segmentio-kafka/config/excepted.yml
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+segmentItems:
+  - serviceName: segmentio-kafka
+    segmentSize: 3
+    segments:
+      - segmentId: not null
+        spans:
+          - operationName: Kafka/sw-topic/Producer
+            parentSpanId: 0
+            spanId: 1
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 40
+            isError: false
+            spanType: Exit
+            peer: kafka-server:9092
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: not null }
+              - { key: mq.topic, value: not null }
+          - operationName: GET:/execute
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: Http
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 5004
+            isError: false
+            spanType: Entry
+            peer: ''
+            skipAnalysis: false
+            tags:
+              - { key: http.method, value: GET }
+              - { key: url, value: 'service:8080/execute' }
+              - { key: status_code, value: '200' }
+      - segmentId: not null
+        spans:
+          - operationName: Kafka/sw-topic/Consumer
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 41
+            isError: false
+            spanType: Entry
+            peer: kafka-server:9092
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: not null }
+              - { key: mq.topic, value: not null }
+            refs:
+              - { parentEndpoint: 'GET:/execute', networkAddress: 'kafka-server:9092',
+                  refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null,
+                  parentServiceInstance: not null, parentService: segmentio-kafka,
+                  traceId: not null }
+meterItems: [ ]
+logItems: [ ]
\ No newline at end of file
diff --git a/test/plugins/scenarios/segmentio-kafka/go.mod b/test/plugins/scenarios/segmentio-kafka/go.mod
new file mode 100644
index 0000000..0b839b5
--- /dev/null
+++ b/test/plugins/scenarios/segmentio-kafka/go.mod
@@ -0,0 +1,9 @@
+module test/plugins/scenarios/segmentio-kafka
+
+go 1.18
+
+require (
+	github.com/klauspost/compress v1.15.9 // indirect
+	github.com/pierrec/lz4/v4 v4.1.15 // indirect
+	github.com/segmentio/kafka-go v0.4.47 // indirect
+)
diff --git a/test/plugins/scenarios/segmentio-kafka/go.sum b/test/plugins/scenarios/segmentio-kafka/go.sum
new file mode 100644
index 0000000..cf392a8
--- /dev/null
+++ b/test/plugins/scenarios/segmentio-kafka/go.sum
@@ -0,0 +1,59 @@
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
+github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
+github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
+github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
+github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
+github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
+github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
+golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
+golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
+golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
+golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
+golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
+golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
+golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/test/plugins/scenarios/segmentio-kafka/main.go b/test/plugins/scenarios/segmentio-kafka/main.go
new file mode 100644
index 0000000..58df214
--- /dev/null
+++ b/test/plugins/scenarios/segmentio-kafka/main.go
@@ -0,0 +1,148 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+	"context"
+	"fmt"
+	"log"
+	"net"
+	"net/http"
+	"strconv"
+	"time"
+
+	"github.com/segmentio/kafka-go"
+
+	_ "github.com/apache/skywalking-go"
+)
+
+type testFunc func() error
+
+var (
+	url    = "kafka-server:9092"
+	topic  = "sw-topic"
+	msg    = "I love skywalking 3 thousand"
+	ctx    = context.Background()
+	writer *kafka.Writer
+	reader *kafka.Reader
+)
+
+func main() {
+	writer = newKafkaWriter(topic)
+	defer writer.Close()
+	reader = newKafkaReader()
+	defer reader.Close()
+	consumerHelper()
+
+	route := http.NewServeMux()
+	route.HandleFunc("/execute", func(res http.ResponseWriter, req *http.Request) {
+		testProduceConsume()
+		_, _ = res.Write([]byte("execute success"))
+	})
+	route.HandleFunc("/health", func(writer http.ResponseWriter, request *http.Request) {
+		_, _ = writer.Write([]byte("ok"))
+	})
+	fmt.Println("start client")
+	err := http.ListenAndServe(":8080", route)
+	if err != nil {
+		fmt.Printf("client start error: %v \n", err)
+	}
+}
+
+func testProduceConsume() {
+	tests := []struct {
+		name string
+		fn   testFunc
+	}{
+		{"simpleMsg", simpleMsg},
+	}
+	for _, test := range tests {
+		fmt.Printf("excute test case: %s\n", test.name)
+		if subErr := test.fn(); subErr != nil {
+			fmt.Printf("test case %s failed: %v", test.name, subErr)
+		}
+	}
+}
+
+func simpleMsg() error {
+	if err := writer.WriteMessages(ctx, kafka.Message{
+		Value: []byte(msg),
+	}); err != nil {
+		log.Println("simpleMsg WriteMessages error")
+		return err
+	}
+	return nil
+}
+
+func consumerHelper() {
+	go func() {
+		for {
+			if message, err := reader.ReadMessage(ctx); err != nil {
+				log.Fatal("consumer msg error: ", err)
+			} else {
+				fmt.Printf("consumer|topic=%s, partition=%d, offset=%d, key=%s, value=%s, header=%s\n",
+					message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value), message.Headers)
+			}
+		}
+	}()
+}
+
+func newKafkaReader() *kafka.Reader {
+	return kafka.NewReader(kafka.ReaderConfig{
+		Brokers:        []string{url},
+		Topic:          topic,
+		CommitInterval: 1 * time.Second,
+	})
+}
+
+func newKafkaWriter(topic string) *kafka.Writer {
+	createTopic()
+	return &kafka.Writer{
+		Addr:  kafka.TCP(url),
+		Topic: topic,
+	}
+}
+
+func createTopic() {
+	conn, err := kafka.Dial("tcp", url)
+	if err != nil {
+		log.Fatal(fmt.Errorf("createTopic, Dial: %w", err))
+	}
+	defer conn.Close()
+	controller, err := conn.Controller()
+	if err != nil {
+		err = fmt.Errorf("createTopic, conn.Controller: %w", err)
+		log.Fatal(err)
+	}
+	conn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
+	if err != nil {
+		log.Fatal("kafka.Dial error: ", err)
+	}
+	conn.SetDeadline(time.Now().Add(time.Second))
+	topicConfigs := []kafka.TopicConfig{
+		{
+			Topic:             topic,
+			NumPartitions:     1,
+			ReplicationFactor: 1,
+		},
+	}
+	err = conn.CreateTopics(topicConfigs...)
+	if err != nil {
+		log.Fatal(fmt.Errorf("createTopic error: %w", err))
+	}
+}
diff --git a/test/plugins/scenarios/segmentio-kafka/plugin.yml b/test/plugins/scenarios/segmentio-kafka/plugin.yml
new file mode 100644
index 0000000..887e422
--- /dev/null
+++ b/test/plugins/scenarios/segmentio-kafka/plugin.yml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+entry-service: http://${HTTP_HOST}:${HTTP_PORT}/execute
+health-checker: http://${HTTP_HOST}:${HTTP_PORT}/health
+start-script: ./bin/startup.sh
+framework: github.com/segmentio/kafka-go
+export-port: 8080
+support-version:
+  - go: 1.18
+    framework:
+      - v0.4.47
+dependencies:
+  zookeeper-server:
+    image: zookeeper:3.9.2
+    hostname: zookeeper-server
+  kafka-server:
+    image: bitnami/kafka:3.7.0
+    hostname: kafka-server
+    ports:
+      - 9092
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: "zookeeper-server:2181"
+      KAFKA_BROKER_ID: 1
+      KAFKA_LISTENERS: "PLAINTEXT://kafka-server:9092"
+    depends_on:
+      - zookeeper-server
\ No newline at end of file
diff --git a/tools/go-agent/instrument/plugins/register.go b/tools/go-agent/instrument/plugins/register.go
index 7726b41..2ed4857 100644
--- a/tools/go-agent/instrument/plugins/register.go
+++ b/tools/go-agent/instrument/plugins/register.go
@@ -41,6 +41,7 @@
 	"github.com/apache/skywalking-go/plugins/pulsar"
 	"github.com/apache/skywalking-go/plugins/rocketmq"
 	runtime_metrics "github.com/apache/skywalking-go/plugins/runtimemetrics"
+	segmentiokafka "github.com/apache/skywalking-go/plugins/segmentio-kafka"
 	sql_entry "github.com/apache/skywalking-go/plugins/sql/entry"
 	sql_mysql "github.com/apache/skywalking-go/plugins/sql/mysql"
 )
@@ -66,6 +67,7 @@
 	registerFramework(rocketmq.NewInstrument())
 	registerFramework(amqp.NewInstrument())
 	registerFramework(pulsar.NewInstrument())
+	registerFramework(segmentiokafka.NewInstrument())
 
 	// fasthttp related instruments
 	registerFramework(fasthttp_client.NewInstrument())