blob: fa76978e410207355c375ca27a3a37debe61f391 [file] [log] [blame]
:mod:`airflow.contrib.operators.mlengine_operator`
==================================================
.. py:module:: airflow.contrib.operators.mlengine_operator
Module Contents
---------------
.. data:: log
.. function:: _normalize_mlengine_job_id(job_id)
Replaces invalid MLEngine job_id characters with '_'.
This also adds a leading 'z' in case job_id starts with an invalid
character.
Args:
job_id: A job_id str that may have invalid characters.
Returns:
A valid job_id representation.
.. py:class:: MLEngineBatchPredictionOperator(project_id, job_id, region, data_format, input_paths, output_path, model_name=None, version_name=None, uri=None, max_worker_count=None, runtime_version=None, signature_name=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)
Bases: :class:`airflow.operators.BaseOperator`
Start a Google Cloud ML Engine prediction job.
NOTE: For model origin, users should consider exactly one from the
three options below:
1. Populate ``uri`` field only, which should be a GCS location that
points to a tensorflow savedModel directory.
2. Populate ``model_name`` field only, which refers to an existing
model, and the default version of the model will be used.
3. Populate both ``model_name`` and ``version_name`` fields, which
refers to a specific version of a specific model.
In options 2 and 3, both model and version name should contain the
minimal identifier. For instance, call::
MLEngineBatchPredictionOperator(
...,
model_name='my_model',
version_name='my_version',
...)
if the desired model version is
``projects/my_project/models/my_model/versions/my_version``.
See https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs
for further documentation on the parameters.
:param project_id: The Google Cloud project name where the
prediction job is submitted. (templated)
:type project_id: str
:param job_id: A unique id for the prediction job on Google Cloud
ML Engine. (templated)
:type job_id: str
:param data_format: The format of the input data.
It will default to 'DATA_FORMAT_UNSPECIFIED' if is not provided
or is not one of ["TEXT", "TF_RECORD", "TF_RECORD_GZIP"].
:type data_format: str
:param input_paths: A list of GCS paths of input data for batch
prediction. Accepting wildcard operator ``*``, but only at the end. (templated)
:type input_paths: list[str]
:param output_path: The GCS path where the prediction results are
written to. (templated)
:type output_path: str
:param region: The Google Compute Engine region to run the
prediction job in. (templated)
:type region: str
:param model_name: The Google Cloud ML Engine model to use for prediction.
If version_name is not provided, the default version of this
model will be used.
Should not be None if version_name is provided.
Should be None if uri is provided. (templated)
:type model_name: str
:param version_name: The Google Cloud ML Engine model version to use for
prediction.
Should be None if uri is provided. (templated)
:type version_name: str
:param uri: The GCS path of the saved model to use for prediction.
Should be None if model_name is provided.
It should be a GCS path pointing to a tensorflow SavedModel. (templated)
:type uri: str
:param max_worker_count: The maximum number of workers to be used
for parallel processing. Defaults to 10 if not specified.
:type max_worker_count: int
:param runtime_version: The Google Cloud ML Engine runtime version to use
for batch prediction.
:type runtime_version: str
:param signature_name: The name of the signature defined in the SavedModel
to use for this job.
:type signature_name: str
:param gcp_conn_id: The connection ID used for connection to Google
Cloud Platform.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must
have domain-wide delegation enabled.
:type delegate_to: str
:raises: ``ValueError``: if a unique model/version origin cannot be
determined.
.. attribute:: template_fields
:annotation: = ['_project_id', '_job_id', '_region', '_input_paths', '_output_path', '_model_name', '_version_name', '_uri']
.. method:: execute(self, context)
.. py:class:: MLEngineModelOperator(project_id, model, operation='create', gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)
Bases: :class:`airflow.operators.BaseOperator`
Operator for managing a Google Cloud ML Engine model.
:param project_id: The Google Cloud project name to which MLEngine
model belongs. (templated)
:type project_id: str
:param model: A dictionary containing the information about the model.
If the `operation` is `create`, then the `model` parameter should
contain all the information about this model such as `name`.
If the `operation` is `get`, the `model` parameter
should contain the `name` of the model.
:type model: dict
:param operation: The operation to perform. Available operations are:
* ``create``: Creates a new model as provided by the `model` parameter.
* ``get``: Gets a particular model where the name is specified in `model`.
:type operation: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have
domain-wide delegation enabled.
:type delegate_to: str
.. attribute:: template_fields
:annotation: = ['_model']
.. method:: execute(self, context)
.. py:class:: MLEngineVersionOperator(project_id, model_name, version_name=None, version=None, operation='create', gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)
Bases: :class:`airflow.operators.BaseOperator`
Operator for managing a Google Cloud ML Engine version.
:param project_id: The Google Cloud project name to which MLEngine
model belongs.
:type project_id: str
:param model_name: The name of the Google Cloud ML Engine model that the version
belongs to. (templated)
:type model_name: str
:param version_name: A name to use for the version being operated upon.
If not None and the `version` argument is None or does not have a value for
the `name` key, then this will be populated in the payload for the
`name` key. (templated)
:type version_name: str
:param version: A dictionary containing the information about the version.
If the `operation` is `create`, `version` should contain all the
information about this version such as name, and deploymentUrl.
If the `operation` is `get` or `delete`, the `version` parameter
should contain the `name` of the version.
If it is None, the only `operation` possible would be `list`. (templated)
:type version: dict
:param operation: The operation to perform. Available operations are:
* ``create``: Creates a new version in the model specified by `model_name`,
in which case the `version` parameter should contain all the
information to create that version
(e.g. `name`, `deploymentUrl`).
* ``get``: Gets full information of a particular version in the model
specified by `model_name`.
The name of the version should be specified in the `version`
parameter.
* ``list``: Lists all available versions of the model specified
by `model_name`.
* ``delete``: Deletes the version specified in `version` parameter from the
model specified by `model_name`).
The name of the version should be specified in the `version`
parameter.
:type operation: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have
domain-wide delegation enabled.
:type delegate_to: str
.. attribute:: template_fields
:annotation: = ['_model_name', '_version_name', '_version']
.. method:: execute(self, context)
.. py:class:: MLEngineTrainingOperator(project_id, job_id, package_uris, training_python_module, training_args, region, scale_tier=None, master_type=None, runtime_version=None, python_version=None, job_dir=None, gcp_conn_id='google_cloud_default', delegate_to=None, mode='PRODUCTION', *args, **kwargs)
Bases: :class:`airflow.operators.BaseOperator`
Operator for launching a MLEngine training job.
:param project_id: The Google Cloud project name within which MLEngine
training job should run (templated).
:type project_id: str
:param job_id: A unique templated id for the submitted Google MLEngine
training job. (templated)
:type job_id: str
:param package_uris: A list of package locations for MLEngine training job,
which should include the main training program + any additional
dependencies. (templated)
:type package_uris: str
:param training_python_module: The Python module name to run within MLEngine
training job after installing 'package_uris' packages. (templated)
:type training_python_module: str
:param training_args: A list of templated command line arguments to pass to
the MLEngine training program. (templated)
:type training_args: str
:param region: The Google Compute Engine region to run the MLEngine training
job in (templated).
:type region: str
:param scale_tier: Resource tier for MLEngine training job. (templated)
:type scale_tier: str
:param master_type: Cloud ML Engine machine name.
Must be set when scale_tier is CUSTOM. (templated)
:type master_type: str
:param runtime_version: The Google Cloud ML runtime version to use for
training. (templated)
:type runtime_version: str
:param python_version: The version of Python used in training. (templated)
:type python_version: str
:param job_dir: A Google Cloud Storage path in which to store training
outputs and other data needed for training. (templated)
:type job_dir: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have
domain-wide delegation enabled.
:type delegate_to: str
:param mode: Can be one of 'DRY_RUN'/'CLOUD'. In 'DRY_RUN' mode, no real
training job will be launched, but the MLEngine training job request
will be printed out. In 'CLOUD' mode, a real MLEngine training job
creation request will be issued.
:type mode: str
.. attribute:: template_fields
:annotation: = ['_project_id', '_job_id', '_package_uris', '_training_python_module', '_training_args', '_region', '_scale_tier', '_master_type', '_runtime_version', '_python_version', '_job_dir']
.. method:: execute(self, context)