blob: 49f3839890182117a1140f50838bd54ef88d84a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.kafka;
import com.google.gson.Gson;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.server.pool.CustomThreadFactory;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import zipkin2.Span;
import zipkin2.SpanBytesDecoderDetector;
import zipkin2.codec.BytesDecoder;
@Slf4j
public class KafkaHandler {
private final ZipkinReceiverConfig config;
private final SpanForward spanForward;
private final Properties properties;
private final ThreadPoolExecutor executor;
private final boolean enableKafkaMessageAutoCommit;
private final List<String> topics;
private final CounterMetrics msgDroppedIncr;
private final CounterMetrics errorCounter;
private final HistogramMetrics histogram;
public KafkaHandler(final ZipkinReceiverConfig config, ModuleManager manager) {
this.config = config;
this.spanForward = new SpanForward(config, manager);
properties = new Properties();
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, config.getKafkaGroupId());
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getKafkaBootstrapServers());
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
Gson gson = new Gson();
Properties override = gson.fromJson(config.getKafkaConsumerConfig(), Properties.class);
properties.putAll(override);
int threadPoolSize = Runtime.getRuntime().availableProcessors() * 2;
if (config.getKafkaHandlerThreadPoolSize() > 0) {
threadPoolSize = config.getKafkaHandlerThreadPoolSize();
}
int threadPoolQueueSize = 10000;
if (config.getKafkaHandlerThreadPoolQueueSize() > 0) {
threadPoolQueueSize = config.getKafkaHandlerThreadPoolQueueSize();
}
enableKafkaMessageAutoCommit = new ConsumerConfig(properties).getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(threadPoolQueueSize),
new CustomThreadFactory("Zipkin-Kafka-Consumer"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
topics = Arrays.asList(config.getKafkaTopic().split(","));
MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
histogram = metricsCreator.createHistogramMetric(
"trace_in_latency",
"The process latency of trace data",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("zipkin-kafka")
);
msgDroppedIncr = metricsCreator.createCounter(
"trace_dropped_count", "The dropped number of traces",
new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-kafka"));
errorCounter = metricsCreator.createCounter(
"trace_analysis_error_count", "The error number of trace analysis",
new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-kafka")
);
}
public void start() throws ModuleStartException {
for (int i = 0; i < config.getKafkaConsumers(); i++) {
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(topics);
consumer.seekToEnd(consumer.assignment());
executor.submit(() -> runTask(consumer));
}
}
private void runTask(final KafkaConsumer<byte[], byte[]> consumer) {
if (log.isDebugEnabled()) {
log.debug("Start Consume zipkin trace records from kafka.");
}
while (true) {
try {
ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(1000L));
if (log.isDebugEnabled()) {
log.debug(
"Consume zipkin trace records from kafka, records count:[{}].",
consumerRecords.count()
);
}
if (!consumerRecords.isEmpty()) {
for (final ConsumerRecord<byte[], byte[]> record : consumerRecords) {
final byte[] bytes = record.value();
//empty or illegal message
if (bytes.length < 2) {
msgDroppedIncr.inc();
continue;
}
executor.submit(() -> handleRecord(bytes));
}
if (!enableKafkaMessageAutoCommit) {
consumer.commitAsync();
}
}
} catch (Exception e) {
log.error("Kafka handle message error.", e);
errorCounter.inc();
}
}
}
private void handleRecord(byte[] bytes) {
try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
BytesDecoder<Span> decoder = SpanBytesDecoderDetector.decoderForListMessage(bytes);
final List<Span> spanList = decoder.decodeList(bytes);
spanForward.send(spanList);
}
}
}