// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package kafka
import (
const Name = "kafka-client"
type Client struct {
Brokers string `mapstructure:"brokers"` // The Kafka broker addresses (default `localhost:9092`).
Version string `mapstructure:"version"` // The version should follow this pattern, which is `major.minor.veryMinor.patch`.
EnableTLS bool `mapstructure:"enable_TLS"` // The TLS switch (default false).
ClientPemPath string `mapstructure:"client_pem_path"` // The file path of client.pem. The config only works when opening the TLS switch.
ClientKeyPath string `mapstructure:"client_key_path"` // The file path of client.key. The config only works when opening the TLS switch.
CaPemPath string `mapstructure:"ca_pem_path"` // The file path oca.pem. The config only works when opening the TLS switch.
RequiredAcks int16 `mapstructure:"required_acks"` // 0 means NoResponse, 1 means WaitForLocal and -1 means WaitForAll (default 1).
ProducerMaxRetry int `mapstructure:"producer_max_retry"` // The producer max retry times (default 3).
MetaMaxRetry int `mapstructure:"meta_max_retry"` // The meta max retry times (default 3).
RetryBackoff int `mapstructure:"retry_backoff"` // How long to wait for the cluster to settle between retries (default 100ms).
MaxMessageBytes int `mapstructure:"max_message_bytes"` // The max message bytes.
IdempotentWrites bool `mapstructure:"idempotent_writes"` // Ensure that exactly one copy of each message is written when is true.
ClientID string `mapstructure:"client_id"` // A user-provided string sent with every request to the brokers.
CompressionCodec int `mapstructure:"compression_codec"` // Represents the various compression codecs recognized by Kafka in messages.
RefreshPeriod int `mapstructure:"refresh_period"` // How frequently to refresh the cluster metadata.
InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"` // Controls whether a client verifies the server's certificate chain and host name.
// components
client sarama.Client // The kafka producer.
listeners []chan<- api.ClientStatus
status api.ClientStatus
ctx context.Context // Parent ctx
cancel context.CancelFunc // Parent ctx cancel function
func (c *Client) Name() string {
return Name
func (c *Client) Description() string {
return "The Kafka client is a sharing plugin to keep connection with the Kafka brokers and delivery the data to it."
func (c *Client) DefaultConfig() string {
return `
# The Kafka broker addresses (default localhost:9092). Multiple values are separated by commas.
brokers: localhost:9092
# The Kakfa version should follow this pattern, which is major_minor_veryMinor_patch (default
# The TLS switch (default false).
enable_TLS: false
# The file path of client.pem. The config only works when opening the TLS switch.
client_pem_path: ""
# The file path of client.key. The config only works when opening the TLS switch.
client_key_path: ""
# The file path oca.pem. The config only works when opening the TLS switch.
ca_pem_path: ""
# 0 means NoResponse, 1 means WaitForLocal and -1 means WaitForAll (default 1).
required_acks: 1
# The producer max retry times (default 3).
producer_max_retry: 3
# The meta max retry times (default 3).
meta_max_retry: 3
# How long to wait for the cluster to settle between retries (default 100ms). Time unit is ms.
retry_backoff: 100
# The max message bytes.
max_message_bytes: 1000000
# If enabled, the producer will ensure that exactly one copy of each message is written (default false).
idempotent_writes: false
# A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes (default Satellite).
client_id: Satellite
# Compression codec represents the various compression codecs recognized by Kafka in messages. 0 : None, 1 : Gzip, 2 : Snappy, 3 : LZ4, 4 : ZSTD
compression_codec: 0
# How frequently to refresh the cluster metadata in the background. Defaults to 10 minutes. The unit is minute.
refresh_period: 10
# InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name.
insecure_skip_verify: true
func (c *Client) Prepare() error {
cfg, err := c.loadConfig()
if err != nil {
return fmt.Errorf("cannot init the kafka producer: %v", err)
sarama.Logger = log.Logger
client, err := sarama.NewClient(strings.Split(c.Brokers, ","), cfg)
if err != nil {
return fmt.Errorf("cannot init the kafka client: %v", err)
c.client = client
c.status = api.Connected
c.ctx, c.cancel = context.WithCancel(context.Background())
c.listeners = make([]chan<- api.ClientStatus, 0)
return nil
func (c *Client) Close() error {
defer log.Logger.Info("kafka client is closed")
return c.client.Close()
func (c *Client) GetConnectedClient() interface{} {
return c.client
func (c *Client) RegisterListener(listener chan<- api.ClientStatus) {
c.listeners = append(c.listeners, listener)
func (c *Client) Start() error {
// start supported processes.
go c.snifferBrokerStatus()
return nil