Fix metadata messed up when transferring Log data (#115)

diff --git a/CHANGES.md b/CHANGES.md
index b37b862..78472b1 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -2,16 +2,13 @@
 ==================
 Release Notes.
 
-1.0.0
+1.1.0
 ------------------
 #### Features
-* Add the compat protocol receiver for the old version of agents.
-* Support transmit the native eBPF Process and Profiling protocol.
-* Change the name of plugin that is not well-named.
 
 #### Bug Fixes
-* Fix Metadata lost in the Native Meter protocol.
+* Fix metadata messed up when transferring Log data.
 
 #### Issues and PR
-- All issues are [here](https://github.com/apache/skywalking/milestone/115?closed=1)
-- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Apr+milestone%3A1.0.0+is%3Aclosed)
\ No newline at end of file
+- All issues are [here](https://github.com/apache/skywalking/milestone/137?closed=1)
+- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Apr+milestone%3A1.1.0+is%3Aclosed)
\ No newline at end of file
diff --git a/changes/changes-1.0.0.md b/changes/changes-1.0.0.md
new file mode 100644
index 0000000..b37b862
--- /dev/null
+++ b/changes/changes-1.0.0.md
@@ -0,0 +1,17 @@
+Changes by Version
+==================
+Release Notes.
+
+1.0.0
+------------------
+#### Features
+* Add the compat protocol receiver for the old version of agents.
+* Support transmit the native eBPF Process and Profiling protocol.
+* Change the name of plugin that is not well-named.
+
+#### Bug Fixes
+* Fix Metadata lost in the Native Meter protocol.
+
+#### Issues and PR
+- All issues are [here](https://github.com/apache/skywalking/milestone/115?closed=1)
+- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Apr+milestone%3A1.0.0+is%3Aclosed)
\ No newline at end of file
diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index 09dbfc9..d1d6d9c 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -155,7 +155,7 @@
       # The maximum buffer elements.
       max_buffer_size: ${SATELLITE_LOGPIPE_SENDER_MAX_BUFFER_SIZE:200}
       # The minimum flush elements.
-      min_flush_events: ${SATELLITE_LOGPIPE_SENDER_MIN_FLUSH_EVENTS:100}
+      min_flush_events: ${SATELLITE_LOGPIPE_SENDER_MIN_FLUSH_EVENTS:1}
       client_name: grpc-client
       forwarders:
         - plugin_name: native-log-grpc-forwarder
diff --git a/go.mod b/go.mod
index b22cdef..6f0147b 100644
--- a/go.mod
+++ b/go.mod
@@ -22,7 +22,7 @@
 	gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
 	gotest.tools v2.2.0+incompatible
 	k8s.io/apimachinery v0.20.6
-	skywalking.apache.org/repo/goapi v0.0.0-20220513074115-4af2c2d37d2f
+	skywalking.apache.org/repo/goapi v0.0.0-20220615082501-7d36e7c0c3c9
 )
 
 require (
diff --git a/go.sum b/go.sum
index ecd6e43..6d0e9b9 100644
--- a/go.sum
+++ b/go.sum
@@ -1724,6 +1724,6 @@
 sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
 sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
 sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20220513074115-4af2c2d37d2f h1:InBru/3MVpcVoGPGRjY+A5LpAi992E37j6dCvvNqF/w=
-skywalking.apache.org/repo/goapi v0.0.0-20220513074115-4af2c2d37d2f/go.mod h1:uWwwvhcwe2MD/nJCg0c1EE/eL6KzaBosLHDfMFoEJ30=
+skywalking.apache.org/repo/goapi v0.0.0-20220615082501-7d36e7c0c3c9 h1:aeZds1YLgbt7IJQEub1cBX97jWqFbI8J1Vjscg9obkI=
+skywalking.apache.org/repo/goapi v0.0.0-20220615082501-7d36e7c0c3c9/go.mod h1:uWwwvhcwe2MD/nJCg0c1EE/eL6KzaBosLHDfMFoEJ30=
 sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
diff --git a/internal/satellite/config/loader_test.go b/internal/satellite/config/loader_test.go
index 482574c..1dcddcc 100644
--- a/internal/satellite/config/loader_test.go
+++ b/internal/satellite/config/loader_test.go
@@ -181,7 +181,7 @@
 				},
 				FlushTime:      1000,
 				MaxBufferSize:  200,
-				MinFlushEvents: 100,
+				MinFlushEvents: 1,
 				ClientName:     "grpc-client",
 				ForwardersConfig: []plugin.Config{
 					{
diff --git a/plugins/forwarder/grpc/nativelog/forwarder.go b/plugins/forwarder/grpc/nativelog/forwarder.go
index 779eb93..a0179b5 100644
--- a/plugins/forwarder/grpc/nativelog/forwarder.go
+++ b/plugins/forwarder/grpc/nativelog/forwarder.go
@@ -72,37 +72,43 @@
 }
 
 func (f *Forwarder) Forward(batch event.BatchEvents) error {
-	stream, err := f.logClient.Collect(context.Background())
-	if err != nil {
-		log.Logger.Errorf("open grpc stream error %v", err)
-		return err
-	}
 	for _, e := range batch {
-		data, ok := e.GetData().(*v1.SniffData_Log)
+		data, ok := e.GetData().(*v1.SniffData_LogList)
 		if !ok {
 			continue
 		}
-		err := stream.SendMsg(server_grpc.NewOriginalData(data.Log))
+		stream, err := f.logClient.Collect(context.Background())
 		if err != nil {
-			log.Logger.Errorf("%s send log data error: %v", f.Name(), err)
-			err = closeStream(stream)
-			if err != nil {
-				log.Logger.Errorf("%s close stream error: %v", f.Name(), err)
-			}
+			log.Logger.Errorf("open grpc stream error %v", err)
 			return err
 		}
-	}
-	return closeStream(stream)
-}
+		streamClosed := false
+		for _, logData := range data.LogList.Logs {
+			err := stream.SendMsg(server_grpc.NewOriginalData(logData))
+			if err != nil {
+				log.Logger.Errorf("%s send log data error: %v", f.Name(), err)
+				f.closeStream(stream)
+				streamClosed = true
+				break
+			}
+		}
 
-func closeStream(stream logging.LogReportService_CollectClient) error {
-	_, err := stream.CloseAndRecv()
-	if err != nil && err != io.EOF {
-		return err
+		if !streamClosed {
+			f.closeStream(stream)
+		}
 	}
 	return nil
 }
 
+func (f *Forwarder) closeStream(stream logging.LogReportService_CollectClient) {
+	_, err := stream.CloseAndRecv()
+	if err != nil && err != io.EOF {
+		if err != nil {
+			log.Logger.Errorf("%s close stream error: %v", f.Name(), err)
+		}
+	}
+}
+
 func (f *Forwarder) ForwardType() v1.SniffType {
 	return v1.SniffType_Logging
 }
diff --git a/plugins/forwarder/kafka/nativelog/forwarder.go b/plugins/forwarder/kafka/nativelog/forwarder.go
index b13e24e..caa4612 100644
--- a/plugins/forwarder/kafka/nativelog/forwarder.go
+++ b/plugins/forwarder/kafka/nativelog/forwarder.go
@@ -76,14 +76,16 @@
 func (f *Forwarder) Forward(batch event.BatchEvents) error {
 	var message []*sarama.ProducerMessage
 	for _, e := range batch {
-		data, ok := e.GetData().(*v1.SniffData_Log)
+		data, ok := e.GetData().(*v1.SniffData_LogList)
 		if !ok {
 			continue
 		}
-		message = append(message, &sarama.ProducerMessage{
-			Topic: f.Topic,
-			Value: sarama.ByteEncoder(data.Log),
-		})
+		for _, l := range data.LogList.Logs {
+			message = append(message, &sarama.ProducerMessage{
+				Topic: f.Topic,
+				Value: sarama.ByteEncoder(l),
+			})
+		}
 	}
 	return f.producer.SendMessages(message)
 }
diff --git a/plugins/queue/mmap/queue_test.go b/plugins/queue/mmap/queue_test.go
index d881cc1..47d8cb7 100644
--- a/plugins/queue/mmap/queue_test.go
+++ b/plugins/queue/mmap/queue_test.go
@@ -99,8 +99,8 @@
 			},
 			Type:   v1.SniffType_Logging,
 			Remote: true,
-			Data: &v1.SniffData_Log{
-				Log: logBytes,
+			Data: &v1.SniffData_LogList{
+				LogList: &v1.BatchLogList{Logs: [][]byte{logBytes}},
 			},
 		},
 		)
@@ -149,8 +149,8 @@
 		},
 		Type:   v1.SniffType_Logging,
 		Remote: true,
-		Data: &v1.SniffData_Log{
-			Log: logBytes,
+		Data: &v1.SniffData_LogList{
+			LogList: &v1.BatchLogList{Logs: [][]byte{logBytes}},
 		},
 	}
 }
@@ -332,7 +332,7 @@
 		}
 	}
 	want := []int32{
-		1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 6, 6, 7, 7, 8, 5,
+		1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 8, 5, 6, 6, 7, 7, 8, 5,
 	}
 	if !cmp.Equal(want, memcost) {
 		t.Fatalf("the memory cost trends are not in line with expectations,\n want: %v,\n but got: %v", want, memcost)
diff --git a/plugins/receiver/grpc/nativelog/log_report_service.go b/plugins/receiver/grpc/nativelog/log_report_service.go
index 6670f81..ab47404 100644
--- a/plugins/receiver/grpc/nativelog/log_report_service.go
+++ b/plugins/receiver/grpc/nativelog/log_report_service.go
@@ -36,25 +36,37 @@
 }
 
 func (s *LogReportService) Collect(stream logging.LogReportService_CollectServer) error {
+	dataList := make([][]byte, 0)
+	originalData := grpc.NewOriginalData(nil)
 	for {
-		originalData := grpc.NewOriginalData(nil)
 		err := stream.RecvMsg(originalData)
 		if err == io.EOF {
+			s.flushLogs(dataList)
 			return stream.SendAndClose(&common.Commands{})
 		}
 		if err != nil {
+			s.flushLogs(dataList)
 			return err
 		}
-		e := &v1.SniffData{
-			Name:      eventName,
-			Timestamp: time.Now().UnixNano() / 1e6,
-			Meta:      nil,
-			Type:      v1.SniffType_Logging,
-			Remote:    true,
-			Data: &v1.SniffData_Log{
-				Log: originalData.Content,
-			},
-		}
-		s.receiveChannel <- e
+		dataList = append(dataList, originalData.Content)
 	}
 }
+
+func (s *LogReportService) flushLogs(dataList [][]byte) {
+	if len(dataList) == 0 {
+		return
+	}
+	e := &v1.SniffData{
+		Name:      eventName,
+		Timestamp: time.Now().UnixNano() / 1e6,
+		Meta:      nil,
+		Type:      v1.SniffType_Logging,
+		Remote:    true,
+		Data: &v1.SniffData_LogList{
+			LogList: &v1.BatchLogList{
+				Logs: dataList,
+			},
+		},
+	}
+	s.receiveChannel <- e
+}
diff --git a/plugins/receiver/grpc/nativelog/receiver_test.go b/plugins/receiver/grpc/nativelog/receiver_test.go
index 97241e8..1eda45a 100644
--- a/plugins/receiver/grpc/nativelog/receiver_test.go
+++ b/plugins/receiver/grpc/nativelog/receiver_test.go
@@ -52,7 +52,7 @@
 		return data.String()
 	}, func(data *v1.SniffData) string {
 		d := new(logging.LogData)
-		_ = proto.Unmarshal(data.GetLog(), d)
+		_ = proto.Unmarshal(data.GetLogList().Logs[0], d)
 		return d.String()
 	}, t)
 }
diff --git a/plugins/receiver/http/nativcelog/receiver.go b/plugins/receiver/http/nativcelog/receiver.go
index aef9b2e..ca51ecd 100644
--- a/plugins/receiver/http/nativcelog/receiver.go
+++ b/plugins/receiver/http/nativcelog/receiver.go
@@ -120,8 +120,10 @@
 			Meta:      nil,
 			Type:      v1.SniffType_Logging,
 			Remote:    true,
-			Data: &v1.SniffData_Log{
-				Log: b,
+			Data: &v1.SniffData_LogList{
+				LogList: &v1.BatchLogList{
+					Logs: [][]byte{b},
+				},
 			},
 		}
 		r.OutputChannel <- e
diff --git a/plugins/receiver/http/nativcelog/receiver_test.go b/plugins/receiver/http/nativcelog/receiver_test.go
index 5cfb1cf..0549445 100644
--- a/plugins/receiver/http/nativcelog/receiver_test.go
+++ b/plugins/receiver/http/nativcelog/receiver_test.go
@@ -84,7 +84,7 @@
 
 		newData := <-r.Channel()
 		d := new(logging.LogData)
-		_ = proto.Unmarshal(newData.Data.(*v1.SniffData_Log).Log, d)
+		_ = proto.Unmarshal(newData.Data.(*v1.SniffData_LogList).LogList.Logs[0], d)
 		if !cmp.Equal(d.String(), data.String()) {
 			t.Fatalf("the sent data is not equal to the received data\n, "+
 				"want data %s\n, but got %s\n", data.String(), newData.String())