| // Copyright Istio Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package google |
| |
| import ( |
| "bytes" |
| "crypto/tls" |
| "crypto/x509" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "io" |
| "net/http" |
| "net/http/httputil" |
| "sync" |
| "time" |
| ) |
| |
| import ( |
| "istio.io/pkg/env" |
| "istio.io/pkg/log" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-go-pixiu/pkg/security" |
| "github.com/apache/dubbo-go-pixiu/security/pkg/stsservice" |
| "github.com/apache/dubbo-go-pixiu/security/pkg/util" |
| ) |
| |
| const ( |
| httpTimeOutInSec = 5 |
| maxRequestRetry = 5 |
| cacheHitDivisor = 50 |
| contentType = "application/json" |
| scope = "https://www.googleapis.com/auth/cloud-platform" |
| tokenType = "urn:ietf:params:oauth:token-type:access_token" |
| federatedToken = "federated token" |
| accessToken = "access token" |
| GCPAuthProvider = "gcp" |
| ) |
| |
| var ( |
| pluginLog = log.RegisterScope("token", "token manager plugin debugging", 0) |
| federatedTokenEndpoint = "https://sts.googleapis.com/v1/token" |
| accessTokenEndpoint = "https://iamcredentials.googleapis.com/v1/projects/-/" + |
| "serviceAccounts/service-%s@gcp-sa-meshdataplane.iam.gserviceaccount.com:generateAccessToken" |
| // default grace period in seconds of an access token. If caching is enabled and token remaining life time is |
| // within this period, refresh access token. |
| defaultGracePeriod = 300 |
| GCEProvider = "GoogleComputeEngine" |
| // GKEClusterURL is the URL to send requests to the token exchange service. |
| GKEClusterURL = env.RegisterStringVar("GKE_CLUSTER_URL", "", "The url of GKE cluster").Get() |
| ) |
| |
| // Plugin supports token exchange with Google OAuth 2.0 authorization server. |
| type Plugin struct { |
| httpClient *http.Client |
| credFetcher security.CredFetcher |
| trustDomain string |
| // tokens is the cache for fetched tokens. |
| // map key is token type, map value is tokenInfo. |
| tokens sync.Map |
| gcpProjectNumber string |
| gkeClusterURL string |
| enableCache bool |
| |
| // Counts numbers of access token cache hits. |
| mutex sync.RWMutex |
| accessTokenCacheHit uint64 |
| } |
| |
| // CreateTokenManagerPlugin creates a plugin that fetches token from a Google OAuth 2.0 authorization server. |
| func CreateTokenManagerPlugin(credFetcher security.CredFetcher, trustDomain, gcpProjectNumber, gkeClusterURL string, enableCache bool) (*Plugin, error) { |
| caCertPool, err := x509.SystemCertPool() |
| if err != nil { |
| pluginLog.Errorf("Failed to get SystemCertPool: %v", err) |
| return nil, err |
| } |
| p := &Plugin{ |
| httpClient: &http.Client{ |
| Timeout: httpTimeOutInSec * time.Second, |
| Transport: &http.Transport{ |
| TLSClientConfig: &tls.Config{ |
| RootCAs: caCertPool, |
| }, |
| }, |
| }, |
| credFetcher: credFetcher, |
| trustDomain: trustDomain, |
| gcpProjectNumber: gcpProjectNumber, |
| gkeClusterURL: gkeClusterURL, |
| enableCache: enableCache, |
| } |
| return p, nil |
| } |
| |
| type federatedTokenResponse struct { |
| AccessToken string `json:"access_token"` |
| IssuedTokenType string `json:"issued_token_type"` |
| TokenType string `json:"token_type"` |
| ExpiresIn int64 `json:"expires_in"` // Expiration time in seconds |
| } |
| |
| // GenerateToken takes STS request parameters and fetches token, returns StsResponseParameters in JSON. |
| func (p *Plugin) ExchangeToken(parameters security.StsRequestParameters) ([]byte, error) { |
| if tokenSTS, ok := p.useCachedToken(); ok { |
| return tokenSTS, nil |
| } |
| pluginLog.Debugf("Start to fetch token with STS request parameters: %v", parameters) |
| ftResp, err := p.fetchFederatedToken(parameters) |
| if err != nil { |
| return nil, err |
| } |
| atResp, err := p.fetchAccessToken(ftResp) |
| if err != nil { |
| return nil, err |
| } |
| return p.generateSTSResp(atResp) |
| } |
| |
| // useCachedToken checks if there is a cached access token which is not going to expire soon. Returns |
| // cached token in STS response or false if token is not available. |
| func (p *Plugin) useCachedToken() ([]byte, bool) { |
| if !p.enableCache { |
| return nil, false |
| } |
| v, ok := p.tokens.Load(accessToken) |
| if !ok { |
| return nil, false |
| } |
| |
| var cacheHitCount uint64 |
| p.mutex.Lock() |
| p.accessTokenCacheHit++ |
| cacheHitCount = p.accessTokenCacheHit |
| p.mutex.Unlock() |
| |
| token := v.(stsservice.TokenInfo) |
| remainingLife := time.Until(token.ExpireTime) |
| if cacheHitCount%cacheHitDivisor == 0 { |
| pluginLog.Debugf("find a cached access token with remaining lifetime: %s (number of cache hits: %d)", |
| remainingLife.String(), cacheHitCount) |
| } |
| if remainingLife > time.Duration(defaultGracePeriod)*time.Second { |
| expireInSec := int64(remainingLife.Seconds()) |
| if tokenSTS, err := p.generateSTSRespInner(token.Token, expireInSec); err == nil { |
| if cacheHitCount%cacheHitDivisor == 0 { |
| pluginLog.Debugf("generated an STS response using a cached access token") |
| } |
| return tokenSTS, true |
| } |
| } |
| return nil, false |
| } |
| |
| // Construct the audience field for GetFederatedToken request. |
| func (p *Plugin) constructAudience(subjectToken string) string { |
| provider := "" |
| if p.credFetcher != nil { |
| provider = p.credFetcher.GetIdentityProvider() |
| } |
| // For GKE, we do not register IdentityProvider explicitly. The provider name |
| // is GKEClusterURL by default. |
| if provider == "" { |
| if GKEClusterURL != "" { |
| provider = GKEClusterURL |
| } else { |
| provider = p.gkeClusterURL |
| } |
| } |
| |
| var identityNS string |
| // Prefer to use the identity namespace from the token audience. The trust domain |
| // could configured differently from the identity namespace. |
| // Note the token exchange request would fail anyway if the identity namespace is not |
| // matched with the audience of the token. |
| if audiences, err := util.GetAud(subjectToken); len(audiences) == 1 && audiences[0] != "" { |
| identityNS = audiences[0] |
| } else { |
| pluginLog.Errorf("expect only 1 non-empty audience in token but found %v with error %v. Fallback to use trust domain %q as the identity namespace.", |
| audiences, err, p.trustDomain) |
| identityNS = p.trustDomain |
| } |
| |
| return fmt.Sprintf("identitynamespace:%s:%s", identityNS, provider) |
| } |
| |
| // constructFederatedTokenRequest returns an HTTP request for federated token. |
| // Example of a federated token request: |
| // POST https://sts.googleapis.com/v1/token |
| // Content-Type: application/json |
| // |
| // { |
| // audience: <trust domain>:<provider> |
| // grantType: urn:ietf:params:oauth:grant-type:token-exchange |
| // requestedTokenType: urn:ietf:params:oauth:token-type:access_token |
| // subjectTokenType: urn:ietf:params:oauth:token-type:jwt |
| // subjectToken: <jwt token> |
| // Scope: https://www.googleapis.com/auth/cloud-platform |
| // } |
| func (p *Plugin) constructFederatedTokenRequest(parameters security.StsRequestParameters) (*http.Request, error) { |
| reqScope := scope |
| if len(parameters.Scope) != 0 { |
| reqScope = parameters.Scope |
| } |
| aud := p.constructAudience(parameters.SubjectToken) |
| query := map[string]string{ |
| "audience": aud, |
| "grantType": parameters.GrantType, |
| "requestedTokenType": tokenType, |
| "subjectTokenType": parameters.SubjectTokenType, |
| "subjectToken": parameters.SubjectToken, |
| "scope": reqScope, |
| } |
| jsonQuery, err := json.Marshal(query) |
| if err != nil { |
| return nil, fmt.Errorf("failed to marshal query for get federated token request: %+v", err) |
| } |
| req, err := http.NewRequest("POST", federatedTokenEndpoint, bytes.NewBuffer(jsonQuery)) |
| if err != nil { |
| return req, fmt.Errorf("failed to create get federated token request: %+v", err) |
| } |
| req.Header.Set("Content-Type", contentType) |
| if pluginLog.DebugEnabled() { |
| dQuery := map[string]string{ |
| "audience": aud, |
| "grantType": parameters.GrantType, |
| "requestedTokenType": tokenType, |
| "subjectTokenType": parameters.SubjectTokenType, |
| "subjectToken": "redacted", |
| "scope": reqScope, |
| } |
| dJSONQuery, _ := json.Marshal(dQuery) |
| dReq, _ := http.NewRequest("POST", federatedTokenEndpoint, bytes.NewBuffer(dJSONQuery)) |
| dReq.Header.Set("Content-Type", contentType) |
| |
| reqDump, _ := httputil.DumpRequest(dReq, true) |
| pluginLog.Debugf("Prepared federated token request: \n%s", string(reqDump)) |
| } else { |
| pluginLog.Infof("Prepared federated token request for aud %q", aud) |
| } |
| return req, nil |
| } |
| |
| // fetchFederatedToken exchanges a third-party issued Json Web Token for an OAuth2.0 access token |
| // which asserts a third-party identity within an identity namespace. |
| func (p *Plugin) fetchFederatedToken(parameters security.StsRequestParameters) (*federatedTokenResponse, error) { |
| respData := &federatedTokenResponse{} |
| |
| req, err := p.constructFederatedTokenRequest(parameters) |
| if err != nil { |
| pluginLog.Errorf("failed to create get federated token request: %+v", err) |
| return nil, err |
| } |
| resp, timeElapsed, err := p.sendRequestWithRetry(req) |
| if err != nil { |
| respCode := 0 |
| if resp != nil { |
| respCode = resp.StatusCode |
| } |
| pluginLog.Errorf("Failed to exchange federated token (HTTP status %d, total time elapsed %s): %v", |
| respCode, timeElapsed.String(), err) |
| return nil, fmt.Errorf("failed to exchange federated token (HTTP status %d): %v", respCode, |
| err) |
| } |
| // resp should not be nil. |
| defer resp.Body.Close() |
| |
| if pluginLog.DebugEnabled() { |
| respDump, _ := httputil.DumpResponse(resp, false) |
| pluginLog.Debugf("Received federated token response after %s: \n%s", |
| timeElapsed.String(), string(respDump)) |
| } |
| |
| body, err := io.ReadAll(resp.Body) |
| if err != nil { |
| pluginLog.Errorf("Failed to read federated token response body: %+v", err) |
| return respData, fmt.Errorf("failed to read federated token response body: %+v", err) |
| } |
| if err := json.Unmarshal(body, respData); err != nil { |
| pluginLog.Errorf("Failed to unmarshal federated token response data: %v", err) |
| return respData, fmt.Errorf("failed to unmarshal federated token response data: %v", err) |
| } |
| if respData.AccessToken == "" { |
| pluginLog.Error("federated token response does not have access token", string(body)) |
| return respData, errors.New("federated token response does not have access token. " + string(body)) |
| } |
| pluginLog.WithLabels("latency", timeElapsed.String(), "ttl", respData.ExpiresIn).Infof("fetched federated token") |
| tokenReceivedTime := time.Now() |
| p.tokens.Store(federatedToken, stsservice.TokenInfo{ |
| TokenType: federatedToken, |
| IssueTime: tokenReceivedTime, |
| ExpireTime: tokenReceivedTime.Add(time.Duration(respData.ExpiresIn) * time.Second), |
| }) |
| return respData, nil |
| } |
| |
| // Send HTTP request every 0.01 seconds until successfully receive response or hit max retry numbers. |
| // If response code is 4xx, return immediately without retry. |
| func (p *Plugin) sendRequestWithRetry(req *http.Request) (resp *http.Response, elapsedTime time.Duration, err error) { |
| start := time.Now() |
| for i := 0; i < maxRequestRetry; i++ { |
| resp, err = p.httpClient.Do(req) |
| if err != nil { |
| pluginLog.Errorf("failed to send out request: %v (response: %v)", err, resp) |
| } |
| if resp != nil && resp.StatusCode == http.StatusOK { |
| return resp, time.Since(start), err |
| } |
| if resp != nil && resp.StatusCode >= http.StatusBadRequest && resp.StatusCode < http.StatusInternalServerError { |
| return resp, time.Since(start), err |
| } |
| time.Sleep(10 * time.Millisecond) |
| } |
| if resp != nil && resp.StatusCode != http.StatusOK { |
| bodyBytes, _ := io.ReadAll(resp.Body) |
| defer resp.Body.Close() |
| return resp, time.Since(start), fmt.Errorf("HTTP Status %d, body: %s", resp.StatusCode, string(bodyBytes)) |
| } |
| return resp, time.Since(start), err |
| } |
| |
| type Duration struct { |
| // Signed seconds of the span of time. Must be from -315,576,000,000 |
| // to +315,576,000,000 inclusive. Note: these bounds are computed from: |
| // 60 sec/min * 60 min/hr * 24 hr/day * 365.25 days/year * 10000 years |
| Seconds int64 `json:"seconds"` |
| } |
| |
| type accessTokenRequest struct { |
| Name string `json:"name"` // nolint: structcheck, unused |
| Delegates []string `json:"delegates"` |
| Scope []string `json:"scope"` |
| LifeTime Duration `json:"lifetime"` // nolint: structcheck, unused |
| } |
| |
| type accessTokenResponse struct { |
| AccessToken string `json:"accessToken"` |
| ExpireTime string `json:"expireTime"` |
| } |
| |
| // constructFederatedTokenRequest returns an HTTP request for access token. |
| // Example of an access token request: |
| // POST https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/ |
| // service-<GCP project number>@gcp-sa-meshdataplane.iam.gserviceaccount.com:generateAccessToken |
| // Content-Type: application/json |
| // Authorization: Bearer <federated token> |
| // |
| // { |
| // "Delegates": [], |
| // "Scope": [ |
| // https://www.googleapis.com/auth/cloud-platform |
| // ], |
| // } |
| func (p *Plugin) constructGenerateAccessTokenRequest(fResp *federatedTokenResponse) (*http.Request, error) { |
| // Request for access token with a lifetime of 3600 seconds. |
| query := accessTokenRequest{ |
| LifeTime: Duration{Seconds: 3600}, |
| } |
| query.Scope = append(query.Scope, scope) |
| |
| jsonQuery, err := json.Marshal(query) |
| if err != nil { |
| return nil, fmt.Errorf("failed to marshal query for get access token request: %+v", err) |
| } |
| endpoint := fmt.Sprintf(accessTokenEndpoint, p.gcpProjectNumber) |
| req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(jsonQuery)) |
| if err != nil { |
| return nil, fmt.Errorf("failed to create get access token request: %+v", err) |
| } |
| req.Header.Add("Content-Type", contentType) |
| if pluginLog.DebugEnabled() { |
| reqDump, _ := httputil.DumpRequest(req, true) |
| pluginLog.Debugf("Prepared access token request: \n%s", string(reqDump)) |
| } |
| req.Header.Add("Authorization", "Bearer "+fResp.AccessToken) |
| return req, nil |
| } |
| |
| func (p *Plugin) fetchAccessToken(federatedToken *federatedTokenResponse) (*accessTokenResponse, error) { |
| respData := &accessTokenResponse{} |
| |
| req, err := p.constructGenerateAccessTokenRequest(federatedToken) |
| if err != nil { |
| pluginLog.Errorf("failed to create get access token request: %+v", err) |
| return nil, err |
| } |
| resp, timeElapsed, err := p.sendRequestWithRetry(req) |
| if err != nil { |
| respCode := 0 |
| if resp != nil { |
| respCode = resp.StatusCode |
| } |
| pluginLog.Errorf("failed to exchange access token (HTTP status %d, total time elapsed %s): %v", |
| respCode, timeElapsed.String(), err) |
| return respData, fmt.Errorf("failed to exchange access token (HTTP status %d): %v", respCode, err) |
| } |
| defer resp.Body.Close() |
| |
| if pluginLog.DebugEnabled() { |
| respDump, _ := httputil.DumpResponse(resp, false) |
| pluginLog.Debugf("Received access token response after %s: \n%s", |
| timeElapsed.String(), string(respDump)) |
| } |
| |
| body, err := io.ReadAll(resp.Body) |
| if err != nil { |
| pluginLog.Errorf("Failed to read access token response body: %+v", err) |
| return respData, fmt.Errorf("failed to read access token response body: %+v", err) |
| } |
| if err := json.Unmarshal(body, respData); err != nil { |
| pluginLog.Errorf("Failed to unmarshal access token response data: %v", err) |
| return respData, fmt.Errorf("failed to unmarshal access token response data: %v", err) |
| } |
| if respData.AccessToken == "" { |
| pluginLog.Error("access token response does not have access token", string(body)) |
| return respData, errors.New("access token response does not have access token. " + string(body)) |
| } |
| pluginLog.Debug("successfully exchanged an access token") |
| // Store access token |
| // Default token life time is 3600 seconds. |
| tokenExp := time.Now().Add(3600 * time.Second) |
| exp, err := time.Parse(time.RFC3339Nano, respData.ExpireTime) |
| if err != nil { |
| pluginLog.Errorf("Failed to unmarshal timestamp %s from access token response, "+ |
| "fall back to use default lifetime (3600 seconds): %v", respData.ExpireTime, err) |
| } else { |
| tokenExp = exp |
| } |
| pluginLog.WithLabels("latency", timeElapsed.String(), "ttl", time.Until(tokenExp)).Infof("fetched access token") |
| // Update cache and reset cache hit counter. |
| p.tokens.Store(accessToken, stsservice.TokenInfo{ |
| TokenType: accessToken, |
| IssueTime: time.Now(), |
| ExpireTime: tokenExp, |
| Token: respData.AccessToken, |
| }) |
| p.mutex.Lock() |
| p.accessTokenCacheHit = 0 |
| p.mutex.Unlock() |
| return respData, nil |
| } |
| |
| // generateSTSResp takes accessTokenResponse and generates StsResponseParameters in JSON. |
| func (p *Plugin) generateSTSResp(atResp *accessTokenResponse) ([]byte, error) { |
| exp, err := time.Parse(time.RFC3339Nano, atResp.ExpireTime) |
| // Default token life time is 3600 seconds |
| var expireInSec int64 = 3600 |
| if err != nil { |
| pluginLog.Errorf("Failed to unmarshal timestamp %s from access token response, "+ |
| "fall back to use default lifetime (3600 seconds): %v", atResp.ExpireTime, err) |
| } else { |
| expireInSec = int64(time.Until(exp).Seconds()) |
| } |
| return p.generateSTSRespInner(atResp.AccessToken, expireInSec) |
| } |
| |
| func (p *Plugin) generateSTSRespInner(token string, expire int64) ([]byte, error) { |
| stsRespParam := stsservice.StsResponseParameters{ |
| AccessToken: token, |
| IssuedTokenType: tokenType, |
| TokenType: "Bearer", |
| ExpiresIn: expire, |
| } |
| statusJSON, err := json.MarshalIndent(stsRespParam, "", " ") |
| if pluginLog.DebugEnabled() { |
| stsRespParam.AccessToken = "redacted" |
| pluginLog.Debugf("Populated STS response parameters: %+v", stsRespParam) |
| } |
| return statusJSON, err |
| } |
| |
| // DumpTokenStatus dumps all token status in JSON |
| func (p *Plugin) DumpPluginStatus() ([]byte, error) { |
| tokenStatus := make([]stsservice.TokenInfo, 0) |
| p.tokens.Range(func(k interface{}, v interface{}) bool { |
| token := v.(stsservice.TokenInfo) |
| tokenStatus = append(tokenStatus, stsservice.TokenInfo{ |
| TokenType: token.TokenType, IssueTime: token.IssueTime, ExpireTime: token.ExpireTime, |
| }) |
| return true |
| }) |
| td := stsservice.TokensDump{ |
| Tokens: tokenStatus, |
| } |
| statusJSON, err := json.MarshalIndent(td, "", " ") |
| return statusJSON, err |
| } |
| |
| // GetMetadata returns the metadata headers related to the token |
| func (p *Plugin) GetMetadata(forCA bool, xdsAuthProvider, token string) (map[string]string, error) { |
| if token == "" { |
| return nil, fmt.Errorf("empty token in plugin GetMetadata") |
| } |
| gcpProjectNumber := p.GetGcpProjectNumber() |
| if !forCA && xdsAuthProvider == GCPAuthProvider && len(gcpProjectNumber) > 0 { |
| return map[string]string{ |
| "authorization": "Bearer " + token, |
| "x-goog-user-project": gcpProjectNumber, |
| }, nil |
| } |
| return map[string]string{ |
| "authorization": "Bearer " + token, |
| }, nil |
| } |
| |
| // SetEndpoints changes the endpoints for testing purposes only. |
| func (p *Plugin) SetEndpoints(fTokenEndpoint, aTokenEndpoint string) { |
| federatedTokenEndpoint = fTokenEndpoint |
| accessTokenEndpoint = aTokenEndpoint |
| } |
| |
| // GetGcpProjectNumber returns the GCP project number |
| func (p *Plugin) GetGcpProjectNumber() string { |
| return p.gcpProjectNumber |
| } |
| |
| // ClearCache is only used for testing purposes. |
| func (p *Plugin) ClearCache() { |
| p.tokens.Delete(federatedToken) |
| p.tokens.Delete(accessToken) |
| } |