blob: 5a98ba10bdfa1942d1c3957d97002663d7bf027c [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package mq
import (
import (
perrors ""
import (
var (
mqClient *Client
once sync.Once
consumerFacadeMap sync.Map
func NewSingletonMQClient(config Config) *Client {
if mqClient == nil {
once.Do(func() {
var err error
mqClient, err = NewMQClient(config)
if err != nil {
logger.Errorf("create mq client failed, %s", err.Error())
return mqClient
func NewMQClient(config Config) (*Client, error) {
var c *Client
ctx := context.Background()
switch config.MqType {
case constant.MQTypeKafka:
config.KafkaProducerConfig.Timeout = config.Timeout
pf, err := NewKafkaProviderFacade(config.KafkaProducerConfig)
if err != nil {
return nil, err
c = &Client{
ctx: ctx,
producerFacade: pf,
kafkaConsumerConfig: config.KafkaConsumerConfig,
case constant.MQTypeRocketMQ:
return nil, perrors.New("rocketmq not support")
return c, nil
type Client struct {
ctx context.Context
producerFacade ProducerFacade
kafkaConsumerConfig KafkaConsumerConfig
func (c Client) Apply() error {
panic("implement me")
func (c Client) Close() error {
return nil
func (c Client) Call(req *client.Request) (res interface{}, err error) {
body, err := io.ReadAll(req.IngressRequest.Body)
if err != nil {
return nil, err
paths := strings.Split(req.API.Path, "/")
if len(paths) < 3 {
return nil, perrors.New("failed to send message, broker or Topic not found")
switch MQActionStrToInt[paths[0]] {
case MQActionPublish:
var pReq MQProduceRequest
err = json.Unmarshal(body, &pReq)
if err != nil {
return nil, err
err = c.producerFacade.Send(pReq.Msg, WithTopic(pReq.Topic))
if err != nil {
return nil, err
case MQActionSubscribe:
var cReq MQSubscribeRequest
err = json.Unmarshal(body, &cReq)
if err != nil {
return nil, err
if _, ok := consumerFacadeMap.Load(cReq.ConsumerGroup); !ok {
facade, err := NewKafkaConsumerFacade(c.kafkaConsumerConfig, cReq.ConsumerGroup)
if err != nil {
return nil, err
consumerFacadeMap.Store(cReq.ConsumerGroup, facade)
if f, ok := consumerFacadeMap.Load(cReq.ConsumerGroup); ok {
cf := f.(ConsumerFacade)
ctx, cancel := context.WithTimeout(c.ctx, req.Timeout)
defer cancel()
err = cf.Subscribe(ctx, WithTopics(cReq.TopicList), WithConsumeUrl(cReq.ConsumeUrl), WithCheckUrl(cReq.CheckUrl), WithConsumerGroup(cReq.ConsumerGroup))
if err != nil {
return nil, err
case MQActionUnSubscribe:
var cReq MQUnSubscribeRequest
err = json.Unmarshal(body, &cReq)
if err != nil {
return nil, err
if facade, ok := consumerFacadeMap.Load(cReq.ConsumerGroup); ok {
return nil, err
return nil, perrors.New("failed to get mq action")
return nil, nil
func (c Client) MapParams(req *client.Request) (reqData interface{}, err error) {
return nil, perrors.New("map params does not support on mq mqClient")