blob: 1673f4dca33a1989d8bbd3451d195313176b7d80 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package internal
import (
"errors"
"fmt"
"net/url"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/gogo/protobuf/proto"
log "github.com/sirupsen/logrus"
)
var (
lookupRequestsCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_lookup_count",
Help: "Counter of lookup requests made by the client",
})
)
// LookupResult encapsulates a struct for lookup a request, containing two parts: LogicalAddr, PhysicalAddr.
type LookupResult struct {
LogicalAddr *url.URL
PhysicalAddr *url.URL
}
// LookupService is a interface of lookup service.
type LookupService interface {
// Lookup perform a lookup for the given topic, confirm the location of the broker
// where the topic is located, and return the LookupResult.
Lookup(topic string) (*LookupResult, error)
}
type lookupService struct {
rpcClient RPCClient
serviceURL *url.URL
tlsEnabled bool
}
// NewLookupService init a lookup service struct and return an object of LookupService.
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, tlsEnabled bool) LookupService {
return &lookupService{
rpcClient: rpcClient,
serviceURL: serviceURL,
tlsEnabled: tlsEnabled,
}
}
func (ls *lookupService) getBrokerAddress(lr *pb.CommandLookupTopicResponse) (logicalAddress *url.URL,
physicalAddress *url.URL, err error) {
if ls.tlsEnabled {
logicalAddress, err = url.ParseRequestURI(lr.GetBrokerServiceUrlTls())
} else {
logicalAddress, err = url.ParseRequestURI(lr.GetBrokerServiceUrl())
}
if err != nil {
return nil, nil, err
}
var physicalAddr *url.URL
if lr.GetProxyThroughServiceUrl() {
physicalAddr = ls.serviceURL
} else {
physicalAddr = logicalAddress
}
return logicalAddress, physicalAddr, nil
}
// Follow brokers redirect up to certain number of times
const lookupResultMaxRedirect = 20
func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
lookupRequestsCount.Inc()
id := ls.rpcClient.NewRequestID()
res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_LOOKUP, &pb.CommandLookupTopic{
RequestId: &id,
Topic: &topic,
Authoritative: proto.Bool(false),
})
if err != nil {
return nil, err
}
log.Debugf("Got topic{%s} lookup response: %+v", topic, res)
for i := 0; i < lookupResultMaxRedirect; i++ {
lr := res.Response.LookupTopicResponse
switch *lr.Response {
case pb.CommandLookupTopicResponse_Redirect:
logicalAddress, physicalAddr, err := ls.getBrokerAddress(lr)
if err != nil {
return nil, err
}
log.Debugf("Follow topic{%s} redirect to broker. %v / %v - Use proxy: %v",
topic, lr.BrokerServiceUrl, lr.BrokerServiceUrlTls, lr.ProxyThroughServiceUrl)
id := ls.rpcClient.NewRequestID()
res, err = ls.rpcClient.Request(logicalAddress, physicalAddr, id, pb.BaseCommand_LOOKUP, &pb.CommandLookupTopic{
RequestId: &id,
Topic: &topic,
Authoritative: lr.Authoritative,
})
if err != nil {
return nil, err
}
// Process the response at the top of the loop
continue
case pb.CommandLookupTopicResponse_Connect:
log.Debugf("Successfully looked up topic{%s} on broker. %s / %s - Use proxy: %t",
topic, lr.GetBrokerServiceUrl(), lr.GetBrokerServiceUrlTls(), lr.GetProxyThroughServiceUrl())
logicalAddress, physicalAddress, err := ls.getBrokerAddress(lr)
if err != nil {
return nil, err
}
return &LookupResult{
LogicalAddr: logicalAddress,
PhysicalAddr: physicalAddress,
}, nil
case pb.CommandLookupTopicResponse_Failed:
errorMsg := ""
if lr.Error != nil {
errorMsg = lr.Error.String()
}
log.Warnf("Failed to lookup topic: %s, error msg: %s", topic, errorMsg)
return nil, fmt.Errorf("failed to lookup topic: %s", errorMsg)
}
}
return nil, errors.New("exceeded max number of redirection during topic lookup")
}