Optimize gRPC Log reporter to set service name for the first element (#206)

diff --git a/CHANGES.md b/CHANGES.md
index b7dc0c8..0b6a902 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -24,6 +24,7 @@
 * Add type name checking in ArgumentTypeNameMatch and ReturnTypeNameMatch
 * Highlight ArgumentTypeNameMatch and ReturnTypeNameMatch type naming rule in docs/en/setup/service-agent/java-agent/Java-Plugin-Development-Guide.md
 * Fix FileWriter scheduled task NPE
+* Optimize gRPC Log reporter to set service name for the first element in the streaming.(No change for Kafka reporter)
 
 #### Documentation
 
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java
index 8db7fbd..6c313a7 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java
@@ -21,7 +21,6 @@
 import io.grpc.Channel;
 import io.grpc.stub.StreamObserver;
 import java.util.List;
-
 import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
@@ -42,10 +41,10 @@
 import org.apache.skywalking.apm.network.logging.v3.LogReportServiceGrpc;
 
 @DefaultImplementor
-public class LogReportServiceClient implements BootService, GRPCChannelListener, IConsumer<LogData> {
+public class LogReportServiceClient implements BootService, GRPCChannelListener, IConsumer<LogData.Builder> {
     private static final ILog LOGGER = LogManager.getLogger(LogReportServiceClient.class);
 
-    private volatile DataCarrier<LogData> carrier;
+    private volatile DataCarrier<LogData.Builder> carrier;
     private volatile GRPCChannelStatus status;
 
     private volatile LogReportServiceGrpc.LogReportServiceStub logReportServiceStub;
@@ -70,7 +69,7 @@
 
     }
 
-    public void produce(LogData logData) {
+    public void produce(LogData.Builder logData) {
         if (Objects.nonNull(logData) && !carrier.produce(logData)) {
             if (LOGGER.isDebugEnable()) {
                 LOGGER.debug("One log has been abandoned, cause by buffer is full.");
@@ -84,7 +83,7 @@
     }
 
     @Override
-    public void consume(final List<LogData> dataList) {
+    public void consume(final List<LogData.Builder> dataList) {
         if (CollectionUtil.isEmpty(dataList)) {
             return;
         }
@@ -95,31 +94,39 @@
             StreamObserver<LogData> logDataStreamObserver = logReportServiceStub
                 .withDeadlineAfter(Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
                 .collect(
-                new StreamObserver<Commands>() {
-                    @Override
-                    public void onNext(final Commands commands) {
+                    new StreamObserver<Commands>() {
+                        @Override
+                        public void onNext(final Commands commands) {
 
-                    }
+                        }
 
-                    @Override
-                    public void onError(final Throwable throwable) {
-                        status.finished();
-                        LOGGER.error(throwable, "Try to send {} log data to collector, with unexpected exception.",
-                                     dataList.size()
-                        );
-                        ServiceManager.INSTANCE
-                            .findService(GRPCChannelManager.class)
-                            .reportError(throwable);
-                    }
+                        @Override
+                        public void onError(final Throwable throwable) {
+                            status.finished();
+                            LOGGER.error(throwable, "Try to send {} log data to collector, with unexpected exception.",
+                                         dataList.size()
+                            );
+                            ServiceManager.INSTANCE
+                                .findService(GRPCChannelManager.class)
+                                .reportError(throwable);
+                        }
 
-                    @Override
-                    public void onCompleted() {
-                        status.finished();
-                    }
-                });
+                        @Override
+                        public void onCompleted() {
+                            status.finished();
+                        }
+                    });
 
-            for (final LogData logData : dataList) {
-                logDataStreamObserver.onNext(logData);
+            boolean isFirst = true;
+            for (final LogData.Builder logData : dataList) {
+                if (isFirst) {
+                    // Only set service name of the first element in one stream
+                    // https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto
+                    // Log collecting protocol defines LogData#service is required in the first element only.
+                    logData.setService(Config.Agent.SERVICE_NAME);
+                    isFirst = false;
+                }
+                logDataStreamObserver.onNext(logData.build());
             }
             logDataStreamObserver.onCompleted();
             status.wait4Finish();
@@ -127,7 +134,7 @@
     }
 
     @Override
-    public void onError(final List<LogData> data, final Throwable t) {
+    public void onError(final List<LogData.Builder> data, final Throwable t) {
         LOGGER.error(t, "Try to consume {} log data to sender, with unexpected exception.", data.size());
     }
 
diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v1/x/log/GRPCLogAppenderInterceptor.java b/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v1/x/log/GRPCLogAppenderInterceptor.java
index 642cbe8..ca3c3af 100644
--- a/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v1/x/log/GRPCLogAppenderInterceptor.java
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v1/x/log/GRPCLogAppenderInterceptor.java
@@ -74,12 +74,11 @@
      *
      * @param appender the real {@link AppenderSkeleton appender}
      * @param event {@link LoggingEvent}
-     * @return {@link LogData} with filtered trace context in order to reduce the cost on the network
+     * @return {@link LogData.Builder} with filtered trace context in order to reduce the cost on the network
      */
-    private LogData transform(final AppenderSkeleton appender, LoggingEvent event) {
+    private LogData.Builder transform(final AppenderSkeleton appender, LoggingEvent event) {
         LogData.Builder builder = LogData.newBuilder()
                 .setTimestamp(event.getTimeStamp())
-                .setService(Config.Agent.SERVICE_NAME)
                 .setServiceInstance(Config.Agent.INSTANCE_NAME)
                 .setTraceContext(TraceContext.newBuilder()
                         .setTraceId(ContextManager.getGlobalTraceId())
@@ -102,12 +101,12 @@
             builder.setEndpoint(primaryEndpointName);
         }
 
-        return -1 == ContextManager.getSpanId() ? builder.build()
+        return -1 == ContextManager.getSpanId() ? builder
                 : builder.setTraceContext(TraceContext.newBuilder()
                         .setTraceId(ContextManager.getGlobalTraceId())
                         .setSpanId(ContextManager.getSpanId())
                         .setTraceSegmentId(ContextManager.getSegmentId())
-                        .build()).build();
+                        .build());
     }
 
     private String transformLogText(final AppenderSkeleton appender, final LoggingEvent event) {
diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-2.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v2/x/log/GRPCLogAppenderInterceptor.java b/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-2.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v2/x/log/GRPCLogAppenderInterceptor.java
index 6e50f25..0b94b68 100644
--- a/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-2.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v2/x/log/GRPCLogAppenderInterceptor.java
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-2.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v2/x/log/GRPCLogAppenderInterceptor.java
@@ -80,7 +80,7 @@
      * @param event {@link LogEvent}
      * @return {@link LogData} with filtered trace context in order to reduce the cost on the network
      */
-    private LogData transform(final AbstractAppender appender, LogEvent event) {
+    private LogData.Builder transform(final AbstractAppender appender, LogEvent event) {
         LogTags.Builder logTags = LogTags.newBuilder()
                 .addData(KeyStringValuePair.newBuilder()
                         .setKey("level").setValue(event.getLevel().toString()).build())
@@ -123,14 +123,14 @@
                     .setTraceId(context.getTraceId())
                     .setSpanId(context.getSpanId())
                     .setTraceSegmentId(context.getTraceSegmentId())
-                    .build()).build();
+                    .build());
         } else {
-            return -1 == ContextManager.getSpanId() ? builder.build()
+            return -1 == ContextManager.getSpanId() ? builder
                     : builder.setTraceContext(TraceContext.newBuilder()
                     .setTraceId(ContextManager.getGlobalTraceId())
                     .setSpanId(ContextManager.getSpanId())
                     .setTraceSegmentId(ContextManager.getSegmentId())
-                    .build()).build();
+                    .build());
         }
     }
 
diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-logback-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/logback/v1/x/log/GRPCLogAppenderInterceptor.java b/apm-sniffer/apm-toolkit-activation/apm-toolkit-logback-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/logback/v1/x/log/GRPCLogAppenderInterceptor.java
index b81b36d..d1072c7 100644
--- a/apm-sniffer/apm-toolkit-activation/apm-toolkit-logback-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/logback/v1/x/log/GRPCLogAppenderInterceptor.java
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-logback-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/logback/v1/x/log/GRPCLogAppenderInterceptor.java
@@ -80,7 +80,7 @@
      * @param event {@link ILoggingEvent}
      * @return {@link LogData} with filtered trace context in order to reduce the cost on the network
      */
-    private LogData transform(final OutputStreamAppender<ILoggingEvent> appender, ILoggingEvent event) {
+    private LogData.Builder transform(final OutputStreamAppender<ILoggingEvent> appender, ILoggingEvent event) {
         LogTags.Builder logTags = LogTags.newBuilder()
                 .addData(KeyStringValuePair.newBuilder()
                         .setKey("level").setValue(event.getLevel().toString()).build())
@@ -118,12 +118,12 @@
             builder.setEndpoint(primaryEndpointName);
         }
 
-        return -1 == ContextManager.getSpanId() ? builder.build()
+        return -1 == ContextManager.getSpanId() ? builder
                 : builder.setTraceContext(TraceContext.newBuilder()
                         .setTraceId(ContextManager.getGlobalTraceId())
                         .setSpanId(ContextManager.getSpanId())
                         .setTraceSegmentId(ContextManager.getSegmentId())
-                        .build()).build();
+                        .build());
     }
 
     private String transformLogText(final OutputStreamAppender<ILoggingEvent> appender, final ILoggingEvent event) {
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.java
index 5c65145..43b251e 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.java
@@ -23,6 +23,7 @@
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
 import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.conf.Config;
 import org.apache.skywalking.apm.agent.core.remote.LogReportServiceClient;
 import org.apache.skywalking.apm.agent.core.util.CollectionUtil;
 import org.apache.skywalking.apm.network.logging.v3.LogData;
@@ -41,18 +42,21 @@
     }
 
     @Override
-    public void produce(final LogData logData) {
+    public void produce(final LogData.Builder logData) {
         super.produce(logData);
     }
 
     @Override
-    public void consume(final List<LogData> dataList) {
+    public void consume(final List<LogData.Builder> dataList) {
         if (producer == null || CollectionUtil.isEmpty(dataList)) {
             return;
         }
 
-        for (LogData data : dataList) {
-            producer.send(new ProducerRecord<>(topic, data.getService(), Bytes.wrap(data.toByteArray())));
+        for (LogData.Builder data : dataList) {
+            // Kafka Log reporter sends one log per time.
+            // Every time, service name should be set to keep data integrity.
+            data.setService(Config.Agent.SERVICE_NAME);
+            producer.send(new ProducerRecord<>(topic, data.getService(), Bytes.wrap(data.build().toByteArray())));
         }
     }