blob: 24d73dbe398a579e20490ed17683c564d0ac02f8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package internal
import (
"errors"
"fmt"
"net/url"
"github.com/gogo/protobuf/proto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
)
// LookupResult encapsulates a struct for lookup a request, containing two parts: LogicalAddr, PhysicalAddr.
type LookupResult struct {
LogicalAddr *url.URL
PhysicalAddr *url.URL
}
// GetTopicsOfNamespaceMode for CommandGetTopicsOfNamespace_Mode
type GetTopicsOfNamespaceMode string
const (
Persistent GetTopicsOfNamespaceMode = "PERSISTENT"
NonPersistent = "NON_PERSISTENT"
All = "ALL"
)
// PartitionedTopicMetadata encapsulates a struct for metadata of a partitioned topic
type PartitionedTopicMetadata struct {
Partitions int `json:"partitions"` // Number of partitions for the topic
}
// LookupService is a interface of lookup service.
type LookupService interface {
// Lookup perform a lookup for the given topic, confirm the location of the broker
// where the topic is located, and return the LookupResult.
Lookup(topic string) (*LookupResult, error)
// GetPartitionedTopicMetadata perform a CommandPartitionedTopicMetadata request for
// the given topic, returns the CommandPartitionedTopicMetadataResponse as the result.
GetPartitionedTopicMetadata(topic string) (*PartitionedTopicMetadata, error)
// GetTopicsOfNamespace returns all the topics name for a given namespace.
GetTopicsOfNamespace(namespace string, mode GetTopicsOfNamespaceMode) ([]string, error)
// GetSchema returns schema for a given version.
GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error)
// Closable Allow Lookup Service's internal client to be able to closed
Closable
}
type lookupService struct {
rpcClient RPCClient
serviceNameResolver ServiceNameResolver
tlsEnabled bool
listenerName string
log log.Logger
metrics *Metrics
}
// NewLookupService init a lookup service struct and return an object of LookupService.
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver,
tlsEnabled bool, listenerName string, logger log.Logger, metrics *Metrics) LookupService {
return &lookupService{
rpcClient: rpcClient,
serviceNameResolver: serviceNameResolver,
tlsEnabled: tlsEnabled,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
metrics: metrics,
listenerName: listenerName,
}
}
func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) {
id := ls.rpcClient.NewRequestID()
req := &pb.CommandGetSchema{
RequestId: proto.Uint64(id),
Topic: proto.String(topic),
SchemaVersion: schemaVersion,
}
res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_GET_SCHEMA, req)
if err != nil {
return nil, err
}
if res.Response.Error != nil {
return nil, errors.New(res.Response.GetError().String())
}
return res.Response.GetSchemaResponse.Schema, nil
}
func (ls *lookupService) getBrokerAddress(lr *pb.CommandLookupTopicResponse) (logicalAddress *url.URL,
physicalAddress *url.URL, err error) {
if ls.tlsEnabled {
logicalAddress, err = url.ParseRequestURI(lr.GetBrokerServiceUrlTls())
} else {
logicalAddress, err = url.ParseRequestURI(lr.GetBrokerServiceUrl())
}
if err != nil {
return nil, nil, err
}
var physicalAddr *url.URL
if lr.GetProxyThroughServiceUrl() {
physicalAddr, err = ls.serviceNameResolver.ResolveHost()
if err != nil {
return nil, nil, err
}
} else {
physicalAddr = logicalAddress
}
return logicalAddress, physicalAddr, nil
}
// Follow brokers redirect up to certain number of times
const lookupResultMaxRedirect = 20
func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
ls.metrics.LookupRequestsCount.Inc()
id := ls.rpcClient.NewRequestID()
res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_LOOKUP, &pb.CommandLookupTopic{
RequestId: &id,
Topic: &topic,
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(ls.listenerName),
})
if err != nil {
return nil, err
}
ls.log.Debugf("Got topic{%s} lookup response: %+v", topic, res)
for i := 0; i < lookupResultMaxRedirect; i++ {
lr := res.Response.LookupTopicResponse
switch *lr.Response {
case pb.CommandLookupTopicResponse_Redirect:
logicalAddress, physicalAddr, err := ls.getBrokerAddress(lr)
if err != nil {
return nil, err
}
ls.log.Debugf("Follow topic{%s} redirect to broker. %v / %v - Use proxy: %v",
topic, lr.BrokerServiceUrl, lr.BrokerServiceUrlTls, lr.ProxyThroughServiceUrl)
id := ls.rpcClient.NewRequestID()
res, err = ls.rpcClient.Request(logicalAddress, physicalAddr, id, pb.BaseCommand_LOOKUP, &pb.CommandLookupTopic{
RequestId: &id,
Topic: &topic,
Authoritative: lr.Authoritative,
AdvertisedListenerName: proto.String(ls.listenerName),
})
if err != nil {
return nil, err
}
// Process the response at the top of the loop
continue
case pb.CommandLookupTopicResponse_Connect:
ls.log.Debugf("Successfully looked up topic{%s} on broker. %s / %s - Use proxy: %t",
topic, lr.GetBrokerServiceUrl(), lr.GetBrokerServiceUrlTls(), lr.GetProxyThroughServiceUrl())
logicalAddress, physicalAddress, err := ls.getBrokerAddress(lr)
if err != nil {
return nil, err
}
return &LookupResult{
LogicalAddr: logicalAddress,
PhysicalAddr: physicalAddress,
}, nil
case pb.CommandLookupTopicResponse_Failed:
ls.log.WithFields(log.Fields{
"topic": topic,
"error": lr.GetError(),
"message": lr.GetMessage(),
}).Warn("Failed to lookup topic")
return nil, errors.New(lr.GetError().String())
}
}
return nil, errors.New("exceeded max number of redirection during topic lookup")
}
func (ls *lookupService) GetPartitionedTopicMetadata(topic string) (*PartitionedTopicMetadata,
error) {
ls.metrics.PartitionedTopicMetadataRequestsCount.Inc()
topicName, err := ParseTopicName(topic)
if err != nil {
return nil, err
}
id := ls.rpcClient.NewRequestID()
res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_PARTITIONED_METADATA,
&pb.CommandPartitionedTopicMetadata{
RequestId: &id,
Topic: &topicName.Name,
})
if err != nil {
return nil, err
}
ls.log.Debugf("Got topic{%s} partitioned metadata response: %+v", topic, res)
var partitionedTopicMetadata PartitionedTopicMetadata
if res.Response.Error != nil {
return nil, errors.New(res.Response.GetError().String())
}
if res.Response.PartitionMetadataResponse != nil {
if res.Response.PartitionMetadataResponse.Error != nil {
return nil, errors.New(res.Response.PartitionMetadataResponse.GetError().String())
}
partitionedTopicMetadata.Partitions = int(res.Response.PartitionMetadataResponse.GetPartitions())
} else {
return nil, fmt.Errorf("no partitioned metadata for topic{%s} in lookup response", topic)
}
return &partitionedTopicMetadata, nil
}
func (ls *lookupService) GetTopicsOfNamespace(namespace string, mode GetTopicsOfNamespaceMode) ([]string, error) {
id := ls.rpcClient.NewRequestID()
pbMode := pb.CommandGetTopicsOfNamespace_Mode(pb.CommandGetTopicsOfNamespace_Mode_value[string(mode)])
req := &pb.CommandGetTopicsOfNamespace{
RequestId: proto.Uint64(id),
Namespace: proto.String(namespace),
Mode: &pbMode,
}
res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_GET_TOPICS_OF_NAMESPACE, req)
if err != nil {
return nil, err
}
if res.Response.Error != nil {
return []string{}, errors.New(res.Response.GetError().String())
}
return res.Response.GetTopicsOfNamespaceResponse.GetTopics(), nil
}
func (ls *lookupService) Close() {}
const HTTPLookupServiceBasePathV1 string = "/lookup/v2/destination/"
const HTTPLookupServiceBasePathV2 string = "/lookup/v2/topic/"
const HTTPAdminServiceV1Format string = "/admin/%s/partitions"
const HTTPAdminServiceV2Format string = "/admin/v2/%s/partitions"
const HTTPTopicUnderNamespaceV1 string = "/admin/namespaces/%s/destinations?mode=%s"
const HTTPTopicUnderNamespaceV2 string = "/admin/v2/namespaces/%s/topics?mode=%s"
type httpLookupData struct {
BrokerURL string `json:"brokerUrl"`
BrokerURLTLS string `json:"brokerUrlTls"`
HTTPURL string `json:"httpUrl"`
HTTPURLTLS string `json:"httpUrlTls"`
}
type httpLookupService struct {
httpClient HTTPClient
serviceNameResolver ServiceNameResolver
tlsEnabled bool
log log.Logger
metrics *Metrics
}
func (h *httpLookupService) getBrokerAddress(ld *httpLookupData) (logicalAddress *url.URL,
physicalAddress *url.URL, err error) {
if h.tlsEnabled {
logicalAddress, err = url.ParseRequestURI(ld.BrokerURLTLS)
} else {
logicalAddress, err = url.ParseRequestURI(ld.BrokerURL)
}
if err != nil {
return nil, nil, err
}
return logicalAddress, logicalAddress, nil
}
func (h *httpLookupService) Lookup(topic string) (*LookupResult, error) {
topicName, err := ParseTopicName(topic)
if err != nil {
return nil, err
}
basePath := HTTPLookupServiceBasePathV2
if !IsV2TopicName(topicName) {
basePath = HTTPLookupServiceBasePathV1
}
lookupData := &httpLookupData{}
err = h.httpClient.Get(basePath+GetTopicRestPath(topicName), lookupData, nil)
if err != nil {
return nil, err
}
h.log.Debugf("Successfully looked up topic{%s} on http broker. %+v",
topic, lookupData)
logicalAddress, physicalAddress, err := h.getBrokerAddress(lookupData)
if err != nil {
return nil, err
}
return &LookupResult{
LogicalAddr: logicalAddress,
PhysicalAddr: physicalAddress,
}, nil
}
func (h *httpLookupService) GetPartitionedTopicMetadata(topic string) (*PartitionedTopicMetadata,
error) {
topicName, err := ParseTopicName(topic)
if err != nil {
return nil, err
}
format := HTTPAdminServiceV2Format
if !IsV2TopicName(topicName) {
format = HTTPAdminServiceV1Format
}
path := fmt.Sprintf(format, GetTopicRestPath(topicName))
tMetadata := &PartitionedTopicMetadata{}
err = h.httpClient.Get(path, tMetadata, map[string]string{"checkAllowAutoCreation": "true"})
if err != nil {
return nil, err
}
h.log.Debugf("Got topic{%s} partitioned metadata response: %+v", topic, tMetadata)
return tMetadata, nil
}
func (h *httpLookupService) GetTopicsOfNamespace(namespace string, mode GetTopicsOfNamespaceMode) ([]string, error) {
format := HTTPTopicUnderNamespaceV2
if !IsV2Namespace(namespace) {
format = HTTPTopicUnderNamespaceV1
}
path := fmt.Sprintf(format, namespace, string(mode))
topics := []string{}
err := h.httpClient.Get(path, &topics, nil)
if err != nil {
return nil, err
}
h.log.Debugf("Got namespace{%s} mode{%s} topics response: %+v", namespace, mode, topics)
return topics, nil
}
func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) {
return nil, errors.New("GetSchema is not supported by httpLookupService")
}
func (h *httpLookupService) Close() {
h.httpClient.Close()
}
// NewHTTPLookupService init a http based lookup service struct and return an object of LookupService.
func NewHTTPLookupService(httpClient HTTPClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver,
tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService {
return &httpLookupService{
httpClient: httpClient,
serviceNameResolver: serviceNameResolver,
tlsEnabled: tlsEnabled,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
metrics: metrics,
}
}