| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """``PTransforms`` that implement Google Cloud Data Loss Prevention |
| functionality. |
| """ |
| |
| from __future__ import absolute_import |
| |
| import logging |
| |
| from google.cloud import dlp_v2 |
| |
| from apache_beam.options.pipeline_options import GoogleCloudOptions |
| from apache_beam.transforms import DoFn |
| from apache_beam.transforms import ParDo |
| from apache_beam.transforms import PTransform |
| from apache_beam.utils.annotations import experimental |
| |
| __all__ = ['MaskDetectedDetails', 'InspectForDetails'] |
| |
| _LOGGER = logging.getLogger(__name__) |
| |
| |
| @experimental() |
| class MaskDetectedDetails(PTransform): |
| """Scrubs sensitive information detected in text. |
| The ``PTransform`` returns a ``PCollection`` of ``str`` |
| Example usage:: |
| |
| pipeline | MaskDetectedDetails(project='example-gcp-project', |
| deidentification_config={ |
| 'info_type_transformations: { |
| 'transformations': [{ |
| 'primitive_transformation': { |
| 'character_mask_config': { |
| 'masking_character': '#' |
| } |
| } |
| }] |
| } |
| }, inspection_config={'info_types': [{'name': 'EMAIL_ADDRESS'}]}) |
| |
| """ |
| def __init__( |
| self, |
| project=None, |
| deidentification_template_name=None, |
| deidentification_config=None, |
| inspection_template_name=None, |
| inspection_config=None, |
| timeout=None): |
| """Initializes a :class:`MaskDetectedDetails` transform. |
| |
| Args: |
| project: Optional. GCP project name in which inspection will be performed |
| deidentification_template_name (str): Either this or |
| `deidentification_config` required. Name of |
| deidentification template to be used on detected sensitive information |
| instances in text. |
| deidentification_config |
| (``Union[dict, google.cloud.dlp_v2.types.DeidentifyConfig]``): |
| Configuration for the de-identification of the content item. |
| If both template name and config are supplied, |
| config is more important. |
| inspection_template_name (str): This or `inspection_config` required. |
| Name of inspection template to be used |
| to detect sensitive data in text. |
| inspection_config |
| (``Union[dict, google.cloud.dlp_v2.types.InspectConfig]``): |
| Configuration for the inspector used to detect sensitive data in text. |
| If both template name and config are supplied, |
| config takes precedence. |
| timeout (float): Optional. The amount of time, in seconds, to wait for |
| the request to complete. |
| |
| """ |
| self.config = {} |
| self.project = project |
| self.timeout = timeout |
| if deidentification_template_name is not None \ |
| and deidentification_config is not None: |
| raise ValueError( |
| 'Both deidentification_template_name and ' |
| 'deidentification_config were specified.' |
| ' Please specify only one of these.') |
| elif deidentification_template_name is None \ |
| and deidentification_config is None: |
| raise ValueError( |
| 'deidentification_template_name or ' |
| 'deidentification_config must be specified.') |
| elif deidentification_template_name is not None: |
| self.config['deidentify_template_name'] = deidentification_template_name |
| else: |
| self.config['deidentify_config'] = deidentification_config |
| |
| if inspection_config is None and inspection_template_name is None: |
| raise ValueError( |
| 'inspection_template_name or inspection_config must be specified') |
| if inspection_template_name is not None: |
| self.config['inspect_template_name'] = inspection_template_name |
| if inspection_config is not None: |
| self.config['inspect_config'] = inspection_config |
| |
| def expand(self, pcoll): |
| if self.project is None: |
| self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project |
| if self.project is None: |
| raise ValueError( |
| 'GCP project name needs to be specified in "project" pipeline option') |
| return ( |
| pcoll |
| | ParDo(_DeidentifyFn(self.config, self.timeout, self.project))) |
| |
| |
| @experimental() |
| class InspectForDetails(PTransform): |
| """Inspects input text for sensitive information. |
| the ``PTransform`` returns a ``PCollection`` of |
| ``List[google.cloud.dlp_v2.proto.dlp_pb2.Finding]`` |
| Example usage:: |
| |
| pipeline | InspectForDetails(project='example-gcp-project', |
| inspection_config={'info_types': [{'name': 'EMAIL_ADDRESS'}]}) |
| """ |
| def __init__( |
| self, |
| project=None, |
| inspection_template_name=None, |
| inspection_config=None, |
| timeout=None): |
| """Initializes a :class:`InspectForDetails` transform. |
| |
| Args: |
| project: Optional. GCP project name in which inspection will be performed |
| inspection_template_name (str): This or `inspection_config` required. |
| Name of inspection template to be used |
| to detect sensitive data in text. |
| inspection_config |
| (``Union[dict, google.cloud.dlp_v2.types.InspectConfig]``): |
| Configuration for the inspector used to detect sensitive data in text. |
| If both template name and config are supplied, |
| config takes precedence. |
| timeout (float): Optional. The amount of time, in seconds, to wait for |
| the request to complete. |
| |
| """ |
| self.timeout = timeout |
| self.config = {} |
| self.project = project |
| if inspection_config is None and inspection_template_name is None: |
| raise ValueError( |
| 'inspection_template_name or inspection_config must be specified') |
| if inspection_template_name is not None: |
| self.config['inspect_template_name'] = inspection_template_name |
| if inspection_config is not None: |
| self.config['inspect_config'] = inspection_config |
| |
| def expand(self, pcoll): |
| if self.project is None: |
| self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project |
| if self.project is None: |
| raise ValueError( |
| 'GCP project name needs to be specified in "project" pipeline option') |
| return pcoll | ParDo(_InspectFn(self.config, self.timeout, self.project)) |
| |
| |
| class _DeidentifyFn(DoFn): |
| def __init__(self, config=None, timeout=None, project=None, client=None): |
| self.config = config |
| self.timeout = timeout |
| self.client = client |
| self.project = project |
| self.params = {} |
| |
| def setup(self): |
| if self.client is None: |
| self.client = dlp_v2.DlpServiceClient() |
| self.params = { |
| 'timeout': self.timeout, |
| 'parent': self.client.project_path(self.project) |
| } |
| self.params.update(self.config) |
| |
| def process(self, element, **kwargs): |
| operation = self.client.deidentify_content( |
| item={"value": element}, **self.params) |
| yield operation.item.value |
| |
| |
| class _InspectFn(DoFn): |
| def __init__(self, config=None, timeout=None, project=None): |
| self.config = config |
| self.timeout = timeout |
| self.client = None |
| self.project = project |
| self.params = {} |
| |
| def setup(self): |
| if self.client is None: |
| self.client = dlp_v2.DlpServiceClient() |
| self.params = { |
| 'timeout': self.timeout, |
| "parent": self.client.project_path(self.project) |
| } |
| self.params.update(self.config) |
| |
| def process(self, element, **kwargs): |
| operation = self.client.inspect_content( |
| item={"value": element}, **self.params) |
| hits = [x for x in operation.result.findings] |
| yield hits |