blob: 65181fc952db965bf1ee9a450904897343834105 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.kafka;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.BUFFER_SIZE;
import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.CHANNEL_SIZE;
/**
* A tracing segment data reporter.
*/
@OverrideImplementor(TraceSegmentServiceClient.class)
public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, KafkaConnectionStatusListener {
private static final ILog LOGGER = LogManager.getLogger(KafkaTraceSegmentServiceClient.class);
private String topic;
private KafkaProducer<String, Bytes> producer;
private volatile DataCarrier<TraceSegment> carrier;
@Override
public void prepare() {
KafkaProducerManager producerManager = ServiceManager.INSTANCE.findService(KafkaProducerManager.class);
producerManager.addListener(this);
topic = producerManager.formatTopicNameThenRegister(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_SEGMENT);
}
@Override
public void boot() {
carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
carrier.consume(this, 1);
}
@Override
public void onComplete() {
TracingContext.ListenerManager.add(this);
}
@Override
public void shutdown() {
TracingContext.ListenerManager.remove(this);
carrier.shutdownConsumers();
}
@Override
public void init(final Properties properties) {
}
@Override
public void consume(final List<TraceSegment> data) {
if (producer == null) {
return;
}
data.forEach(traceSegment -> {
SegmentObject upstreamSegment = traceSegment.transform();
ProducerRecord<String, Bytes> record = new ProducerRecord<>(
topic,
upstreamSegment.getTraceSegmentId(),
Bytes.wrap(upstreamSegment.toByteArray())
);
producer.send(record, (m, e) -> {
if (Objects.nonNull(e)) {
LOGGER.error("Failed to report TraceSegment.", e);
}
});
});
}
@Override
public void onError(final List<TraceSegment> data, final Throwable t) {
LOGGER.error(t, "Try to send {} trace segments to collector, with unexpected exception.", data.size());
}
@Override
public void onExit() {
}
@Override
public void afterFinished(final TraceSegment traceSegment) {
if (LOGGER.isDebugEnable()) {
LOGGER.debug("Trace segment reporting, traceId: {}", traceSegment.getTraceSegmentId());
}
if (traceSegment.isIgnore()) {
LOGGER.debug("Trace[TraceId={}] is ignored.", traceSegment.getTraceSegmentId());
return;
}
carrier.produce(traceSegment);
}
@Override
public void onStatusChanged(KafkaConnectionStatus status) {
if (status == KafkaConnectionStatus.CONNECTED) {
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
}
}
}