blob: d505cc200f9e36a17873e60902f9ff11acf28c32 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Azure credentials and authentication."""
# pytype: skip-file
import logging
import threading
from apache_beam.options.pipeline_options import AzureOptions
try:
from azure.identity import DefaultAzureCredential
_AZURE_AUTH_AVAILABLE = True
except ImportError:
_AZURE_AUTH_AVAILABLE = False
_LOGGER = logging.getLogger(__name__)
def get_service_credentials(pipeline_options):
"""For internal use only; no backwards-compatibility guarantees.
Get credentials to access Azure services.
Args:
pipeline_options: Pipeline options, used in creating credentials
like managed identity credentials.
Returns:
A ``azure.identity.*Credential`` object or None if credentials
not found. Returned object is thread-safe.
"""
return _Credentials.get_service_credentials(pipeline_options)
class _Credentials(object):
_credentials_lock = threading.Lock()
_credentials_init = False
_credentials = None
@classmethod
def get_service_credentials(cls, pipeline_options):
with cls._credentials_lock:
if cls._credentials_init:
return cls._credentials
cls._credentials = cls._get_service_credentials(pipeline_options)
cls._credentials_init = True
return cls._credentials
@staticmethod
def _get_service_credentials(pipeline_options):
if not _AZURE_AUTH_AVAILABLE:
_LOGGER.warning(
'Unable to find default credentials because the azure.identity '
'library is not available. Install the azure.identity library to use '
'Azure default credentials.')
return None
try:
credentials = DefaultAzureCredential(
managed_identity_client_id=pipeline_options.view_as(AzureOptions)\
.azure_managed_identity_client_id)
_LOGGER.debug('Connecting using Azure Default Credentials.')
return credentials
except Exception as e:
_LOGGER.warning('Unable to find Azure credentials to use: %s\n', e)
return None