Optimize gRPC Log reporter to set service name for the first element (#206)
diff --git a/CHANGES.md b/CHANGES.md
index b7dc0c8..0b6a902 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -24,6 +24,7 @@
* Add type name checking in ArgumentTypeNameMatch and ReturnTypeNameMatch
* Highlight ArgumentTypeNameMatch and ReturnTypeNameMatch type naming rule in docs/en/setup/service-agent/java-agent/Java-Plugin-Development-Guide.md
* Fix FileWriter scheduled task NPE
+* Optimize gRPC Log reporter to set service name for the first element in the streaming.(No change for Kafka reporter)
#### Documentation
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java
index 8db7fbd..6c313a7 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java
@@ -21,7 +21,6 @@
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.List;
-
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -42,10 +41,10 @@
import org.apache.skywalking.apm.network.logging.v3.LogReportServiceGrpc;
@DefaultImplementor
-public class LogReportServiceClient implements BootService, GRPCChannelListener, IConsumer<LogData> {
+public class LogReportServiceClient implements BootService, GRPCChannelListener, IConsumer<LogData.Builder> {
private static final ILog LOGGER = LogManager.getLogger(LogReportServiceClient.class);
- private volatile DataCarrier<LogData> carrier;
+ private volatile DataCarrier<LogData.Builder> carrier;
private volatile GRPCChannelStatus status;
private volatile LogReportServiceGrpc.LogReportServiceStub logReportServiceStub;
@@ -70,7 +69,7 @@
}
- public void produce(LogData logData) {
+ public void produce(LogData.Builder logData) {
if (Objects.nonNull(logData) && !carrier.produce(logData)) {
if (LOGGER.isDebugEnable()) {
LOGGER.debug("One log has been abandoned, cause by buffer is full.");
@@ -84,7 +83,7 @@
}
@Override
- public void consume(final List<LogData> dataList) {
+ public void consume(final List<LogData.Builder> dataList) {
if (CollectionUtil.isEmpty(dataList)) {
return;
}
@@ -95,31 +94,39 @@
StreamObserver<LogData> logDataStreamObserver = logReportServiceStub
.withDeadlineAfter(Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
.collect(
- new StreamObserver<Commands>() {
- @Override
- public void onNext(final Commands commands) {
+ new StreamObserver<Commands>() {
+ @Override
+ public void onNext(final Commands commands) {
- }
+ }
- @Override
- public void onError(final Throwable throwable) {
- status.finished();
- LOGGER.error(throwable, "Try to send {} log data to collector, with unexpected exception.",
- dataList.size()
- );
- ServiceManager.INSTANCE
- .findService(GRPCChannelManager.class)
- .reportError(throwable);
- }
+ @Override
+ public void onError(final Throwable throwable) {
+ status.finished();
+ LOGGER.error(throwable, "Try to send {} log data to collector, with unexpected exception.",
+ dataList.size()
+ );
+ ServiceManager.INSTANCE
+ .findService(GRPCChannelManager.class)
+ .reportError(throwable);
+ }
- @Override
- public void onCompleted() {
- status.finished();
- }
- });
+ @Override
+ public void onCompleted() {
+ status.finished();
+ }
+ });
- for (final LogData logData : dataList) {
- logDataStreamObserver.onNext(logData);
+ boolean isFirst = true;
+ for (final LogData.Builder logData : dataList) {
+ if (isFirst) {
+ // Only set service name of the first element in one stream
+ // https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto
+ // Log collecting protocol defines LogData#service is required in the first element only.
+ logData.setService(Config.Agent.SERVICE_NAME);
+ isFirst = false;
+ }
+ logDataStreamObserver.onNext(logData.build());
}
logDataStreamObserver.onCompleted();
status.wait4Finish();
@@ -127,7 +134,7 @@
}
@Override
- public void onError(final List<LogData> data, final Throwable t) {
+ public void onError(final List<LogData.Builder> data, final Throwable t) {
LOGGER.error(t, "Try to consume {} log data to sender, with unexpected exception.", data.size());
}
diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v1/x/log/GRPCLogAppenderInterceptor.java b/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v1/x/log/GRPCLogAppenderInterceptor.java
index 642cbe8..ca3c3af 100644
--- a/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v1/x/log/GRPCLogAppenderInterceptor.java
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v1/x/log/GRPCLogAppenderInterceptor.java
@@ -74,12 +74,11 @@
*
* @param appender the real {@link AppenderSkeleton appender}
* @param event {@link LoggingEvent}
- * @return {@link LogData} with filtered trace context in order to reduce the cost on the network
+ * @return {@link LogData.Builder} with filtered trace context in order to reduce the cost on the network
*/
- private LogData transform(final AppenderSkeleton appender, LoggingEvent event) {
+ private LogData.Builder transform(final AppenderSkeleton appender, LoggingEvent event) {
LogData.Builder builder = LogData.newBuilder()
.setTimestamp(event.getTimeStamp())
- .setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.setTraceContext(TraceContext.newBuilder()
.setTraceId(ContextManager.getGlobalTraceId())
@@ -102,12 +101,12 @@
builder.setEndpoint(primaryEndpointName);
}
- return -1 == ContextManager.getSpanId() ? builder.build()
+ return -1 == ContextManager.getSpanId() ? builder
: builder.setTraceContext(TraceContext.newBuilder()
.setTraceId(ContextManager.getGlobalTraceId())
.setSpanId(ContextManager.getSpanId())
.setTraceSegmentId(ContextManager.getSegmentId())
- .build()).build();
+ .build());
}
private String transformLogText(final AppenderSkeleton appender, final LoggingEvent event) {
diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-2.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v2/x/log/GRPCLogAppenderInterceptor.java b/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-2.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v2/x/log/GRPCLogAppenderInterceptor.java
index 6e50f25..0b94b68 100644
--- a/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-2.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v2/x/log/GRPCLogAppenderInterceptor.java
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-2.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v2/x/log/GRPCLogAppenderInterceptor.java
@@ -80,7 +80,7 @@
* @param event {@link LogEvent}
* @return {@link LogData} with filtered trace context in order to reduce the cost on the network
*/
- private LogData transform(final AbstractAppender appender, LogEvent event) {
+ private LogData.Builder transform(final AbstractAppender appender, LogEvent event) {
LogTags.Builder logTags = LogTags.newBuilder()
.addData(KeyStringValuePair.newBuilder()
.setKey("level").setValue(event.getLevel().toString()).build())
@@ -123,14 +123,14 @@
.setTraceId(context.getTraceId())
.setSpanId(context.getSpanId())
.setTraceSegmentId(context.getTraceSegmentId())
- .build()).build();
+ .build());
} else {
- return -1 == ContextManager.getSpanId() ? builder.build()
+ return -1 == ContextManager.getSpanId() ? builder
: builder.setTraceContext(TraceContext.newBuilder()
.setTraceId(ContextManager.getGlobalTraceId())
.setSpanId(ContextManager.getSpanId())
.setTraceSegmentId(ContextManager.getSegmentId())
- .build()).build();
+ .build());
}
}
diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-logback-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/logback/v1/x/log/GRPCLogAppenderInterceptor.java b/apm-sniffer/apm-toolkit-activation/apm-toolkit-logback-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/logback/v1/x/log/GRPCLogAppenderInterceptor.java
index b81b36d..d1072c7 100644
--- a/apm-sniffer/apm-toolkit-activation/apm-toolkit-logback-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/logback/v1/x/log/GRPCLogAppenderInterceptor.java
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-logback-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/logback/v1/x/log/GRPCLogAppenderInterceptor.java
@@ -80,7 +80,7 @@
* @param event {@link ILoggingEvent}
* @return {@link LogData} with filtered trace context in order to reduce the cost on the network
*/
- private LogData transform(final OutputStreamAppender<ILoggingEvent> appender, ILoggingEvent event) {
+ private LogData.Builder transform(final OutputStreamAppender<ILoggingEvent> appender, ILoggingEvent event) {
LogTags.Builder logTags = LogTags.newBuilder()
.addData(KeyStringValuePair.newBuilder()
.setKey("level").setValue(event.getLevel().toString()).build())
@@ -118,12 +118,12 @@
builder.setEndpoint(primaryEndpointName);
}
- return -1 == ContextManager.getSpanId() ? builder.build()
+ return -1 == ContextManager.getSpanId() ? builder
: builder.setTraceContext(TraceContext.newBuilder()
.setTraceId(ContextManager.getGlobalTraceId())
.setSpanId(ContextManager.getSpanId())
.setTraceSegmentId(ContextManager.getSegmentId())
- .build()).build();
+ .build());
}
private String transformLogText(final OutputStreamAppender<ILoggingEvent> appender, final ILoggingEvent event) {
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.java
index 5c65145..43b251e 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.java
@@ -23,6 +23,7 @@
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.remote.LogReportServiceClient;
import org.apache.skywalking.apm.agent.core.util.CollectionUtil;
import org.apache.skywalking.apm.network.logging.v3.LogData;
@@ -41,18 +42,21 @@
}
@Override
- public void produce(final LogData logData) {
+ public void produce(final LogData.Builder logData) {
super.produce(logData);
}
@Override
- public void consume(final List<LogData> dataList) {
+ public void consume(final List<LogData.Builder> dataList) {
if (producer == null || CollectionUtil.isEmpty(dataList)) {
return;
}
- for (LogData data : dataList) {
- producer.send(new ProducerRecord<>(topic, data.getService(), Bytes.wrap(data.toByteArray())));
+ for (LogData.Builder data : dataList) {
+ // Kafka Log reporter sends one log per time.
+ // Every time, service name should be set to keep data integrity.
+ data.setService(Config.Agent.SERVICE_NAME);
+ producer.send(new ProducerRecord<>(topic, data.getService(), Bytes.wrap(data.build().toByteArray())));
}
}