blob: 7b7533987a722920f9ea6ba8b550f1c90cd450ed [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package internal
import (
"errors"
"fmt"
"net"
"net/url"
"strings"
log "github.com/sirupsen/logrus"
)
const (
BinaryService = "pulsar"
HTTPService = "http"
HTTPSService = "https"
SSLService = "ssl"
BinaryPort = 6650
BinaryTLSPort = 6651
HTTPPort = 80
HTTPSPort = 443
)
type PulsarServiceURI struct {
ServiceName string
ServiceInfos []string
ServiceHosts []string
servicePath string
URL *url.URL
}
func NewPulsarServiceURIFromURIString(uri string) (*PulsarServiceURI, error) {
u, err := fromString(uri)
if err != nil {
log.Error(err)
return nil, err
}
return u, nil
}
func NewPulsarServiceURIFromURL(url *url.URL) (*PulsarServiceURI, error) {
u, err := fromURL(url)
if err != nil {
log.Error(err)
return nil, err
}
return u, nil
}
func fromString(uriStr string) (*PulsarServiceURI, error) {
if uriStr == "" || len(uriStr) == 0 {
return nil, errors.New("service uriStr string is null")
}
if strings.Contains(uriStr, "[") && strings.Contains(uriStr, "]") {
// deal with ipv6 address
hosts := strings.FieldsFunc(uriStr, splitURI)
if len(hosts) > 1 {
// deal with ipv6 address
firstHost := hosts[0]
lastHost := hosts[len(hosts)-1]
hasPath := strings.Contains(lastHost, "/")
path := ""
if hasPath {
idx := strings.Index(lastHost, "/")
path = lastHost[idx:]
}
firstHost += path
url, err := url.Parse(firstHost)
if err != nil {
return nil, err
}
serviceURI, err := fromURL(url)
if err != nil {
return nil, err
}
var mHosts []string
var multiHosts []string
mHosts = append(mHosts, serviceURI.ServiceHosts[0])
mHosts = append(mHosts, hosts[1:]...)
for _, v := range mHosts {
h, err := validateHostName(serviceURI.ServiceName, serviceURI.ServiceInfos, v)
if err == nil {
multiHosts = append(multiHosts, h)
} else {
return nil, err
}
}
return &PulsarServiceURI{
serviceURI.ServiceName,
serviceURI.ServiceInfos,
multiHosts,
serviceURI.servicePath,
serviceURI.URL,
}, nil
}
}
url, err := url.Parse(uriStr)
if err != nil {
return nil, err
}
return fromURL(url)
}
func fromURL(url *url.URL) (*PulsarServiceURI, error) {
if url == nil {
return nil, errors.New("service url instance is null")
}
if url.Host == "" || len(url.Host) == 0 {
return nil, errors.New("service host is null")
}
var serviceName string
var serviceInfos []string
scheme := url.Scheme
if scheme != "" {
scheme = strings.ToLower(scheme)
schemeParts := strings.Split(scheme, "+")
serviceName = schemeParts[0]
serviceInfos = schemeParts[1:]
}
var serviceHosts []string
hosts := strings.FieldsFunc(url.Host, splitURI)
for _, v := range hosts {
h, err := validateHostName(serviceName, serviceInfos, v)
if err == nil {
serviceHosts = append(serviceHosts, h)
} else {
return nil, err
}
}
return &PulsarServiceURI{
serviceName,
serviceInfos,
serviceHosts,
url.Path,
url,
}, nil
}
func splitURI(r rune) bool {
return r == ',' || r == ';'
}
func validateHostName(serviceName string, serviceInfos []string, hostname string) (string, error) {
uri, err := url.Parse("dummyscheme://" + hostname)
if err != nil {
return "", err
}
host := uri.Hostname()
if strings.Contains(hostname, "[") && strings.Contains(hostname, "]") {
host = fmt.Sprintf("[%s]", host)
}
if host == "" || uri.Scheme == "" {
return "", errors.New("Invalid hostname : " + hostname)
}
port := uri.Port()
if uri.Port() == "" {
p := getServicePort(serviceName, serviceInfos)
if p == -1 {
return "", fmt.Errorf("invalid port : %d", p)
}
port = fmt.Sprint(p)
}
result := host + ":" + port
_, _, err = net.SplitHostPort(result)
if err != nil {
return "", err
}
return result, nil
}
func getServicePort(serviceName string, serviceInfos []string) int {
switch strings.ToLower(serviceName) {
case BinaryService:
if len(serviceInfos) == 0 {
return BinaryPort
} else if len(serviceInfos) == 1 && strings.ToLower(serviceInfos[0]) == SSLService {
return BinaryTLSPort
}
case HTTPService:
return HTTPPort
case HTTPSService:
return HTTPSPort
}
return -1
}