blob: 94259852d46f4a2a4bbe9e6d5174ce6bb78256ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.toolkit.logging.common.log;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ForwardingClientCall;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus;
import org.apache.skywalking.apm.agent.core.remote.LogReportServiceClient;
import org.apache.skywalking.apm.agent.core.util.CollectionUtil;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogReportServiceGrpc;
import org.apache.skywalking.apm.util.StringUtil;
/**
* Report log to server by grpc
*/
@OverrideImplementor(LogReportServiceClient.class)
public class GRPCLogReportServiceClient extends LogReportServiceClient {
private static final ILog LOGGER = LogManager.getLogger(GRPCLogReportServiceClient.class);
private volatile DataCarrier<LogData> carrier;
private LogReportServiceGrpc.LogReportServiceStub asyncStub;
private ManagedChannel channel;
private AtomicBoolean disconnected = new AtomicBoolean(false);
private static final Metadata.Key<String> AUTH_HEAD_HEADER_NAME = Metadata.Key.of("Authentication", Metadata.ASCII_STRING_MARSHALLER);
@Override
public void boot() throws Throwable {
carrier = new DataCarrier<>("gRPC-log", "gRPC-log",
Config.Buffer.CHANNEL_SIZE,
Config.Buffer.BUFFER_SIZE,
BufferStrategy.IF_POSSIBLE
);
carrier.consume(this, 1);
channel = ManagedChannelBuilder
.forAddress(
ToolkitConfig.Plugin.Toolkit.Log.GRPC.Reporter.SERVER_HOST,
ToolkitConfig.Plugin.Toolkit.Log.GRPC.Reporter.SERVER_PORT
)
.usePlaintext()
.build();
Channel decoratedChannel = decorateLogChannelWithAuthentication(channel);
asyncStub = LogReportServiceGrpc.newStub(decoratedChannel)
.withMaxOutboundMessageSize(
ToolkitConfig.Plugin.Toolkit.Log.GRPC.Reporter.MAX_MESSAGE_SIZE);
}
@Override
public void shutdown() {
try {
carrier.shutdownConsumers();
if (channel != null) {
channel.shutdownNow();
}
} catch (Throwable t) {
LOGGER.error(t.getMessage(), t);
}
}
@Override
public void produce(LogData logData) {
if (Objects.nonNull(logData) && !carrier.produce(logData)) {
if (LOGGER.isDebugEnable()) {
LOGGER.debug("One log has been abandoned, cause by buffer is full.");
}
}
}
@Override
public void consume(final List<LogData> dataList) {
if (CollectionUtil.isEmpty(dataList)) {
return;
}
StreamObserver<LogData> reportStreamObserver = null;
final GRPCStreamServiceStatus waitStatus = new GRPCStreamServiceStatus(false);
try {
reportStreamObserver = asyncStub.withDeadlineAfter(
ToolkitConfig.Plugin.Toolkit.Log.GRPC.Reporter.UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collect(new StreamObserver<Commands>() {
@Override
public void onNext(Commands commands) {
}
@Override
public void onError(Throwable t) {
waitStatus.finished();
if (disconnected.compareAndSet(false, true)) {
LOGGER.error("Send log to gRPC server fail with an internal exception.", t);
}
LOGGER.error(t, "Try to send {} log data to collector, with unexpected exception.",
dataList.size()
);
}
@Override
public void onCompleted() {
disconnected.compareAndSet(true, false);
waitStatus.finished();
}
});
for (final LogData logData : dataList) {
reportStreamObserver.onNext(logData);
}
} catch (Throwable e) {
if (!(e instanceof StatusRuntimeException)) {
LOGGER.error(e, "Report log failure with the gRPC client.");
}
} finally {
if (reportStreamObserver != null) {
reportStreamObserver.onCompleted();
}
waitStatus.wait4Finish();
}
}
private Channel decorateLogChannelWithAuthentication(Channel channel) {
if (StringUtil.isEmpty(Config.Agent.AUTHENTICATION)) {
return channel;
}
return ClientInterceptors.intercept(channel, new ClientInterceptor() {
@Override
public <REQ, RESP> ClientCall<REQ, RESP> interceptCall(MethodDescriptor<REQ, RESP> method,
CallOptions options, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<REQ, RESP>(channel.newCall(method, options)) {
@Override
public void start(Listener<RESP> responseListener, Metadata headers) {
headers.put(AUTH_HEAD_HEADER_NAME, Config.Agent.AUTHENTICATION);
super.start(responseListener, headers);
}
};
}
});
}
}