blob: 3f1d91ee00ed461161c5438b1f1ca41e8b017bd4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package golang
// import (
// "context"
// "io"
// "reflect"
// "sync"
// "time"
// "github.com/apache/rocketmq-clients/golang/pkg/ticker"
// v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
// )
// type Consumer interface {
// Start() error
// Consume(ctx context.Context, topic string, handler Handler) error
// GracefulStop() error
// }
// var _ = Consumer(&consumer{})
// type consumer struct {
// opts consumerOptions
// ns NameServer
// assignment sync.Map
// handlers map[string]Handler
// done chan struct{}
// }
// type Handler func(ctx context.Context, entry *MessageExt) error
// func NewConsumer(config *Config, opts ...ConsumerOption) (Consumer, error) {
// ns, err := NewNameServer(config)
// if err != nil {
// return nil, err
// }
// c := &consumer{
// opts: defaultConsumerOptions,
// handlers: make(map[string]Handler),
// done: make(chan struct{}),
// }
// for _, opt := range opts {
// opt.apply(&c.opts)
// }
// c.ns = ns
// return c, nil
// }
// func (c *consumer) Start() error {
// c.assignment.Range(func(k, v interface{}) bool {
// topic, ok := k.(string)
// if ok {
// handler := c.handlers[topic]
// f := func() {
// ctx, _ := context.WithTimeout(context.TODO(), time.Second*20)
// b, err := c.ns.GetBroker(ctx, topic)
// if err != nil {
// return
// }
// assign, ok := v.([]*v2.Assignment)
// if !ok || len(assign) == 0 {
// return
// }
// var wg sync.WaitGroup
// for i := 0; i < len(assign); i++ {
// wg.Add(1)
// i := i
// go func() {
// defer wg.Done()
// stream, err := b.ReceiveMessage(ctx, assign[i].MessageQueue, topic)
// if err != nil {
// return
// }
// for {
// resp, err := stream.Recv()
// if err == io.EOF {
// break
// }
// if err != nil {
// time.Sleep(time.Second * 5)
// continue
// }
// msg, ok := resp.GetContent().(*v2.ReceiveMessageResponse_Message)
// if !ok {
// continue
// }
// messageExt := &MessageExt{
// MessageID: msg.Message.GetSystemProperties().GetMessageId(),
// ReceiptHandle: msg.Message.GetSystemProperties().GetReceiptHandle(),
// Message: Message{
// Topic: msg.Message.GetTopic().GetName(),
// Body: msg.Message.GetBody(),
// Keys: msg.Message.GetSystemProperties().GetKeys(),
// Tag: msg.Message.GetSystemProperties().GetTag(),
// Properties: msg.Message.GetUserProperties(),
// },
// }
// if err = handler(ctx, messageExt); err == nil {
// _ = b.AckMessage(ctx, messageExt)
// }
// }
// }()
// }
// wg.Wait()
// }
// ticker.Tick(f, time.Second*20, c.done)
// }
// return true
// })
// <-c.done
// return nil
// }
// func (c *consumer) Consume(ctx context.Context, topic string, handler Handler) error {
// if err := c.queryAssignment(ctx, topic); err != nil {
// return err
// }
// c.handlers[topic] = handler
// return nil
// }
// func (c *consumer) queryAssignment(ctx context.Context, topic string) error {
// b, err := c.ns.GetBroker(ctx, topic)
// if err != nil {
// return err
// }
// assignment, err := b.QueryAssignment(ctx, topic)
// if err != nil {
// return err
// }
// c.assignment.Store(topic, assignment)
// f := func() {
// assign, err := b.QueryAssignment(ctx, topic)
// if err != nil {
// return
// }
// cache, ok := c.assignment.Load(topic)
// if !ok {
// return
// }
// oldAssign, ok := cache.([]*v2.Assignment)
// if !ok {
// return
// }
// if reflect.DeepEqual(assign, oldAssign) {
// return
// }
// c.assignment.Store(topic, assign)
// }
// ticker.Tick(f, time.Second*10, c.done)
// return nil
// }
// func (c *consumer) GracefulStop() error {
// defer close(c.done)
// c.done <- struct{}{}
// return c.ns.GracefulStop()
// }