blob: 012f9f2a5669e7c1ab976325be095b43125fbff8 [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package kafka
import (
"context"
"strings"
"time"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/compress"
"google.golang.org/protobuf/proto"
agentv3 "github.com/apache/skywalking-go/protocols/collect/language/agent/v3"
logv3 "github.com/apache/skywalking-go/protocols/collect/logging/v3"
managementv3 "github.com/apache/skywalking-go/protocols/collect/management/v3"
"github.com/apache/skywalking-go/plugins/core/operator"
"github.com/apache/skywalking-go/plugins/core/reporter"
)
const (
kafkaMaxSendQueueSize int32 = 30000
topicKeyRegister = "register-"
)
var internalReporterContextKey = context.Background()
type kafkaReporter struct {
entity *reporter.Entity
logger operator.LogOperator
writer *kafka.Writer
brokerAddr []string
tracingSendCh chan *agentv3.SegmentObject
metricsSendCh chan []*agentv3.MeterData
logSendCh chan *logv3.LogData
bootFlag bool
checkInterval time.Duration
connectionStatus reporter.ConnectionStatus
topicSegment string
topicMeter string
topicLogging string
topicManagement string
transform *reporter.Transform
cdsManager *reporter.CDSManager
}
func NewKafkaReporter(logger operator.LogOperator,
brokers string,
checkInterval time.Duration,
cdsManager *reporter.CDSManager,
opts ...ReporterOptionKafka,
) (reporter.Reporter, error) {
r := &kafkaReporter{
logger: logger,
tracingSendCh: make(chan *agentv3.SegmentObject, kafkaMaxSendQueueSize),
metricsSendCh: make(chan []*agentv3.MeterData, kafkaMaxSendQueueSize),
logSendCh: make(chan *logv3.LogData, kafkaMaxSendQueueSize),
checkInterval: checkInterval,
connectionStatus: reporter.ConnectionStatusDisconnect,
cdsManager: cdsManager,
}
r.brokerAddr = strings.Split(brokers, ",")
writer := &kafka.Writer{
Addr: kafka.TCP(r.brokerAddr...),
Balancer: &kafka.RoundRobin{},
MaxAttempts: 10,
BatchSize: 1000,
BatchBytes: 1048576,
BatchTimeout: 1000 * time.Millisecond,
RequiredAcks: kafka.RequireOne,
Async: false,
Compression: compress.None,
ErrorLogger: kafka.LoggerFunc(logger.Errorf),
AllowAutoTopicCreation: true,
}
r.writer = writer
for _, opt := range opts {
opt(r)
}
return r, nil
}
func (r *kafkaReporter) Boot(entity *reporter.Entity, cdsWatchers []reporter.AgentConfigChangeWatcher) {
r.entity = entity
r.transform = reporter.NewTransform(entity)
r.updateConnectionStatus()
r.initSendPipeline()
r.check()
r.cdsManager.InitCDS(entity, cdsWatchers)
r.bootFlag = true
}
func (r *kafkaReporter) updateConnectionStatus() {
if r.checkKafkaConnection() {
r.connectionStatus = reporter.ConnectionStatusConnected
} else {
r.connectionStatus = reporter.ConnectionStatusDisconnect
}
}
func (r *kafkaReporter) checkKafkaConnection() bool {
firstAddr := r.brokerAddr[0]
conn, err := kafka.Dial("tcp", firstAddr)
if err != nil {
r.logger.Errorf("kafka connection error %v", err)
return false
}
defer func() {
if err = conn.Close(); err != nil {
r.logger.Errorf("kafka connection close error %v", err)
}
}()
_, err = conn.Brokers()
if err != nil {
r.logger.Errorf("kafka connection error %v", err)
return false
}
return true
}
func (r *kafkaReporter) initSendPipeline() {
go r.tracingSendLoop()
go r.metricsSendLoop()
go r.logSendLoop()
}
func (r *kafkaReporter) tracingSendLoop() {
consecutiveErrors := 0
logFrequency := 30
for s := range r.tracingSendCh {
payload, err := proto.Marshal(s)
if err != nil {
r.logger.Errorf("marshal segment error %v", err)
continue
}
ctx := context.WithValue(context.Background(), internalReporterContextKey, true)
err = r.writer.WriteMessages(ctx, kafka.Message{
Topic: r.topicSegment,
Key: []byte(s.GetTraceSegmentId()),
Value: payload,
})
if err != nil {
consecutiveErrors++
if consecutiveErrors == 1 || consecutiveErrors%logFrequency == 0 {
r.logger.Errorf("send segment to kafka error %v (errors: %d)", err, consecutiveErrors)
}
continue
} else if consecutiveErrors > 0 {
consecutiveErrors = 0
}
}
}
func (r *kafkaReporter) metricsSendLoop() {
consecutiveErrors := 0
logFrequency := 30
for s := range r.metricsSendCh {
payload, err := proto.Marshal(&agentv3.MeterDataCollection{
MeterData: s,
})
if err != nil {
r.logger.Errorf("marshal metrics error %v", err)
continue
}
ctx := context.WithValue(context.Background(), internalReporterContextKey, true)
err = r.writer.WriteMessages(ctx, kafka.Message{
Topic: r.topicMeter,
Key: []byte(r.entity.ServiceInstanceName),
Value: payload,
})
if err != nil {
consecutiveErrors++
if consecutiveErrors == 1 || consecutiveErrors%logFrequency == 0 {
r.logger.Errorf("send metrics to kafka error %v (errors: %d)", err, consecutiveErrors)
}
continue
} else if consecutiveErrors > 0 {
consecutiveErrors = 0
}
}
}
func (r *kafkaReporter) logSendLoop() {
consecutiveErrors := 0
logFrequency := 30
for s := range r.logSendCh {
payload, err := proto.Marshal(s)
if err != nil {
r.logger.Errorf("marshal log error %v", err)
continue
}
ctx := context.WithValue(context.Background(), internalReporterContextKey, true)
err = r.writer.WriteMessages(ctx, kafka.Message{
Topic: r.topicLogging,
Key: []byte(s.Service),
Value: payload,
})
if err != nil {
consecutiveErrors++
if consecutiveErrors == 1 || consecutiveErrors%logFrequency == 0 {
r.logger.Errorf("send log to kafka error %v (errors: %d)", err, consecutiveErrors)
}
continue
} else if consecutiveErrors > 0 {
consecutiveErrors = 0
}
}
}
func (r *kafkaReporter) SendTracing(spans []reporter.ReportedSpan) {
segmentObject := r.transform.TransformSegmentObject(spans)
if segmentObject == nil {
return
}
defer func() {
// recover the panic caused by close tracingSendCh
if err := recover(); err != nil {
r.logger.Errorf("reporter segment err %v", err)
}
}()
select {
case r.tracingSendCh <- segmentObject:
default:
r.logger.Errorf("reach max tracing send buffer")
}
}
func (r *kafkaReporter) SendMetrics(metrics []reporter.ReportedMeter) {
meters := r.transform.TransformMeterData(metrics)
if meters == nil {
return
}
defer func() {
// recover the panic caused by close metricsSendCh
if err := recover(); err != nil {
r.logger.Errorf("reporter metrics err %v", err)
}
}()
select {
case r.metricsSendCh <- meters:
default:
r.logger.Errorf("reach max metrics send buffer")
}
}
func (r *kafkaReporter) SendLog(log *logv3.LogData) {
defer func() {
if err := recover(); err != nil {
r.logger.Errorf("reporter log err %v", err)
}
}()
select {
case r.logSendCh <- log:
default:
}
}
func (r *kafkaReporter) check() {
if r.checkInterval < 0 || r.writer == nil {
return
}
go func() {
// initialDelay
time.Sleep(r.checkInterval)
instancePropertiesSubmitted := false
for {
if !instancePropertiesSubmitted {
instanceProperties := &managementv3.InstanceProperties{
Service: r.entity.ServiceName,
ServiceInstance: r.entity.ServiceInstanceName,
Properties: r.entity.Props,
}
payload, err := proto.Marshal(instanceProperties)
if err != nil {
r.logger.Errorf("marshal instance properties error %v", err)
time.Sleep(r.checkInterval)
continue
}
ctx := context.WithValue(context.Background(), internalReporterContextKey, true)
err = r.writer.WriteMessages(ctx, kafka.Message{
Topic: r.topicManagement,
Key: []byte(topicKeyRegister + r.entity.ServiceInstanceName),
Value: payload,
})
if err != nil {
r.logger.Errorf("send instance properties to kafka error %v", err)
time.Sleep(r.checkInterval)
continue
}
instancePropertiesSubmitted = true
}
ping := &managementv3.InstancePingPkg{
Service: r.entity.ServiceName,
ServiceInstance: r.entity.ServiceInstanceName,
}
payload, err := proto.Marshal(ping)
if err != nil {
r.logger.Errorf("marshal instance ping error %v", err)
time.Sleep(r.checkInterval)
continue
}
ctx := context.WithValue(context.Background(), internalReporterContextKey, true)
err = r.writer.WriteMessages(ctx, kafka.Message{
Topic: r.topicManagement,
Key: []byte(r.entity.ServiceInstanceName),
Value: payload,
})
if err != nil {
r.logger.Errorf("send instance ping to kafka error %v", err)
}
time.Sleep(r.checkInterval)
}
}()
}
func (r *kafkaReporter) ConnectionStatus() reporter.ConnectionStatus {
return r.connectionStatus
}
func (r *kafkaReporter) Close() {
if r.bootFlag {
if r.tracingSendCh != nil {
close(r.tracingSendCh)
}
if r.metricsSendCh != nil {
close(r.metricsSendCh)
}
if r.logSendCh != nil {
close(r.logSendCh)
}
if err := r.writer.Close(); err != nil {
r.logger.Errorf("close kafka writer failed, err: %v", err)
}
}
}
func (r *kafkaReporter) AddProfileTaskManager(p reporter.ProfileTaskManager) {}