blob: 9ee63ab850c22544a91b0b0684a6ec3538261834 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package auth
import (
"crypto/tls"
"fmt"
"github.com/apache/pulsar-client-go/oauth2"
"github.com/apache/pulsar-client-go/oauth2/cache"
"github.com/apache/pulsar-client-go/oauth2/clock"
"github.com/apache/pulsar-client-go/oauth2/store"
)
const (
ConfigParamType = "type"
ConfigParamTypeClientCredentials = "client_credentials"
ConfigParamIssuerURL = "issuerUrl"
ConfigParamAudience = "audience"
ConfigParamKeyFile = "privateKey"
ConfigParamClientID = "clientId"
)
type oauth2AuthProvider struct {
clock clock.Clock
issuer oauth2.Issuer
store store.Store
source cache.CachingTokenSource
}
// NewAuthenticationOAuth2WithParams return a interface of Provider with string map.
func NewAuthenticationOAuth2WithParams(params map[string]string) (Provider, error) {
issuer := oauth2.Issuer{
IssuerEndpoint: params[ConfigParamIssuerURL],
ClientID: params[ConfigParamClientID],
Audience: params[ConfigParamAudience],
}
// initialize a store of authorization grants
st := store.NewMemoryStore()
switch params[ConfigParamType] {
case ConfigParamTypeClientCredentials:
flow, err := oauth2.NewDefaultClientCredentialsFlow(oauth2.ClientCredentialsFlowOptions{
KeyFile: params[ConfigParamKeyFile],
AdditionalScopes: nil,
})
if err != nil {
return nil, err
}
grant, err := flow.Authorize(issuer.Audience)
if err != nil {
return nil, err
}
err = st.SaveGrant(issuer.Audience, *grant)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unsupported authentication type: %s", params[ConfigParamType])
}
return NewAuthenticationOAuth2(issuer, st), nil
}
func NewAuthenticationOAuth2(
issuer oauth2.Issuer,
store store.Store) Provider {
return &oauth2AuthProvider{
clock: clock.RealClock{},
issuer: issuer,
store: store,
}
}
func (p *oauth2AuthProvider) Init() error {
grant, err := p.store.LoadGrant(p.issuer.Audience)
if err != nil {
if err == store.ErrNoAuthenticationData {
return nil
}
return err
}
refresher, err := p.getRefresher(grant.Type)
if err != nil {
return err
}
source, err := cache.NewDefaultTokenCache(p.store, p.issuer.Audience, refresher)
if err != nil {
return err
}
p.source = source
return nil
}
func (p *oauth2AuthProvider) Name() string {
return "token"
}
func (p *oauth2AuthProvider) GetTLSCertificate() (*tls.Certificate, error) {
return nil, nil
}
func (p *oauth2AuthProvider) GetData() ([]byte, error) {
if p.source == nil {
// anonymous access
return nil, nil
}
token, err := p.source.Token()
if err != nil {
return nil, err
}
return []byte(token.AccessToken), nil
}
func (p *oauth2AuthProvider) Close() error {
return nil
}
func (p *oauth2AuthProvider) getRefresher(t oauth2.AuthorizationGrantType) (oauth2.AuthorizationGrantRefresher, error) {
switch t {
case oauth2.GrantTypeClientCredentials:
return oauth2.NewDefaultClientCredentialsGrantRefresher(p.clock)
case oauth2.GrantTypeDeviceCode:
return oauth2.NewDefaultDeviceAuthorizationGrantRefresher(p.clock)
default:
return nil, store.ErrUnsupportedAuthData
}
}