| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package pulsar |
| |
| import ( |
| "fmt" |
| |
| pkgerrors "github.com/pkg/errors" |
| |
| "github.com/golang/protobuf/proto" |
| |
| "github.com/apache/pulsar-client-go/pulsar/internal" |
| "github.com/apache/pulsar-client-go/pulsar/internal/pb" |
| ) |
| |
| // NewUnexpectedErrMsg instantiates an ErrUnexpectedMsg error. |
| // Optionally provide a list of IDs associated with the message |
| // for additional context in the error message. |
| func newUnexpectedErrMsg(msgType pb.BaseCommand_Type, ids ...interface{}) *unexpectedErrMsg { |
| return &unexpectedErrMsg{ |
| msgType: msgType, |
| ids: ids, |
| } |
| } |
| |
| // UnexpectedErrMsg is returned when an unexpected message is received. |
| type unexpectedErrMsg struct { |
| msgType pb.BaseCommand_Type |
| ids []interface{} |
| } |
| |
| // Error satisfies the error interface. |
| func (e *unexpectedErrMsg) Error() string { |
| msg := fmt.Sprintf("received unexpected message of type %q", e.msgType.String()) |
| for _, id := range e.ids { |
| msg += fmt.Sprintf(" consumerID=%v", id) |
| } |
| return msg |
| } |
| |
| func validateTopicNames(topics ...string) error { |
| var errs error |
| for _, t := range topics { |
| if _, err := internal.ParseTopicName(t); err != nil { |
| errs = pkgerrors.Wrapf(err, "invalid topic name: %s", t) |
| } |
| } |
| |
| return errs |
| } |
| |
| func toKeyValues(metadata map[string]string) []*pb.KeyValue { |
| kvs := make([]*pb.KeyValue, 0, len(metadata)) |
| for k, v := range metadata { |
| kv := &pb.KeyValue{ |
| Key: proto.String(k), |
| Value: proto.String(v), |
| } |
| kvs = append(kvs, kv) |
| } |
| |
| return kvs |
| } |