blob: 3d90bfaa75029e5704328f34835b1c82ebbdfd60 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package cache
import (
"fmt"
"sync"
"time"
"github.com/apache/pulsar-client-go/oauth2"
"github.com/apache/pulsar-client-go/oauth2/store"
xoauth2 "golang.org/x/oauth2"
"github.com/apache/pulsar-client-go/oauth2/clock"
)
// A CachingTokenSource is anything that can return a token, and is backed by a cache.
type CachingTokenSource interface {
xoauth2.TokenSource
// InvalidateToken is called when the token is rejected by the resource server.
InvalidateToken() error
}
const (
// expiryDelta adjusts the token TTL to avoid using tokens which are almost expired
expiryDelta = time.Duration(60) * time.Second
)
// tokenCache implements a cache for the token associated with a specific audience.
// it interacts with the store when the access token is near expiration or invalidated.
// it is advisable to use a token cache instance per audience.
type tokenCache struct {
clock clock.Clock
lock sync.Mutex
store store.Store
audience string
refresher oauth2.AuthorizationGrantRefresher
token *xoauth2.Token
}
func NewDefaultTokenCache(store store.Store, audience string,
refresher oauth2.AuthorizationGrantRefresher) (CachingTokenSource, error) {
cache := &tokenCache{
clock: clock.RealClock{},
store: store,
audience: audience,
refresher: refresher,
}
return cache, nil
}
var _ CachingTokenSource = &tokenCache{}
// Token returns a valid access token, if available.
func (t *tokenCache) Token() (*xoauth2.Token, error) {
t.lock.Lock()
defer t.lock.Unlock()
// use the cached access token if it isn't expired
if t.token != nil && t.validateAccessToken(*t.token) {
return t.token, nil
}
// load from the store and use the access token if it isn't expired
grant, err := t.store.LoadGrant(t.audience)
if err != nil {
return nil, fmt.Errorf("LoadGrant: %v", err)
}
t.token = grant.Token
if t.token != nil && t.validateAccessToken(*t.token) {
return t.token, nil
}
// obtain and cache a fresh access token
grant, err = t.refresher.Refresh(grant)
if err != nil {
return nil, fmt.Errorf("RefreshGrant: %v", err)
}
t.token = grant.Token
err = t.store.SaveGrant(t.audience, *grant)
if err != nil {
// TODO log rather than throw
return nil, fmt.Errorf("SaveGrant: %v", err)
}
return t.token, nil
}
// InvalidateToken clears the access token (likely due to a response from the resource server).
// Note that the token within the grant may contain a refresh token which should survive.
func (t *tokenCache) InvalidateToken() error {
t.lock.Lock()
defer t.lock.Unlock()
previous := t.token
t.token = nil
// clear from the store the access token that was returned earlier (unless the store has since been updated)
if previous == nil || previous.AccessToken == "" {
return nil
}
grant, err := t.store.LoadGrant(t.audience)
if err != nil {
return fmt.Errorf("LoadGrant: %v", err)
}
if grant.Token != nil && grant.Token.AccessToken == previous.AccessToken {
grant.Token.Expiry = time.Unix(0, 0).Add(expiryDelta)
err = t.store.SaveGrant(t.audience, *grant)
if err != nil {
// TODO log rather than throw
return fmt.Errorf("SaveGrant: %v", err)
}
}
return nil
}
// validateAccessToken checks the validity of the cached access token
func (t *tokenCache) validateAccessToken(token xoauth2.Token) bool {
if token.AccessToken == "" {
return false
}
if !token.Expiry.IsZero() && t.clock.Now().After(token.Expiry.Round(0).Add(-expiryDelta)) {
return false
}
return true
}