blob: 0d5c2e4ceee58b838724ffb0461cac350488e670 [file] [log] [blame]
.. py:module:: airflow.providers.apache.hive.hooks.hive
Module Contents
.. autoapisummary::
.. autoapisummary::
.. autoapisummary::
:annotation: = ['VERY_HIGH', 'HIGH', 'NORMAL', 'LOW', 'VERY_LOW']
.. py:function:: get_context_from_env_var()
Extract context from env variable, e.g. dag_id, task_id and execution_date,
so that they can be used inside BashOperator and PythonOperator.
:return: The context of interest.
.. py:class:: HiveCliHook(hive_cli_conn_id = default_conn_name, run_as = None, mapred_queue = None, mapred_queue_priority = None, mapred_job_name = None)
Bases: :py:obj:`airflow.hooks.base.BaseHook`
Simple wrapper around the hive CLI.
It also supports the ``beeline``
a lighter CLI that runs JDBC and is replacing the heavier
traditional CLI. To enable ``beeline``, set the use_beeline param in the
extra field of your connection as in ``{ "use_beeline": true }``
Note that you can also set default hive CLI parameters using the
``hive_cli_params`` to be used in your connection as in
``{"hive_cli_params": "-hiveconf mapred.job.tracker=some.jobtracker:444"}``
Parameters passed here can be overridden by run_cli's hive_conf param
The extra connection parameter ``auth`` gets passed as in the ``jdbc``
connection string as is.
:param hive_cli_conn_id: Reference to the
:ref:`Hive CLI connection id <howto/connection:hive_cli>`.
:param mapred_queue: queue used by the Hadoop Scheduler (Capacity or Fair)
:param mapred_queue_priority: priority within the job queue.
Possible settings include: VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW
:param mapred_job_name: This name will appear in the jobtracker.
This can make monitoring easier.
.. py:attribute:: conn_name_attr
:annotation: = hive_cli_conn_id
.. py:attribute:: default_conn_name
:annotation: = hive_cli_default
.. py:attribute:: conn_type
:annotation: = hive_cli
.. py:attribute:: hook_name
:annotation: = Hive Client Wrapper
.. py:method:: run_cli(hql, schema = None, verbose = True, hive_conf = None)
Run an hql statement using the hive cli. If hive_conf is specified
it should be a dict and the entries will be set as key/value pairs
in HiveConf.
:param hql: an hql (hive query language) statement to run with hive cli
:param schema: Name of hive schema (database) to use
:param verbose: Provides additional logging. Defaults to True.
:param hive_conf: if specified these key value pairs will be passed
to hive as ``-hiveconf "key"="value"``. Note that they will be
passed after the ``hive_cli_params`` and thus will override
whatever values are specified in the database.
>>> hh = HiveCliHook()
>>> result = hh.run_cli("USE airflow;")
>>> ("OK" in result)
.. py:method:: test_hql(hql)
Test an hql statement using the hive cli and EXPLAIN
.. py:method:: load_df(df, table, field_dict = None, delimiter = ',', encoding = 'utf8', pandas_kwargs = None, **kwargs)
Loads a pandas DataFrame into hive.
Hive data types will be inferred if not passed but column names will
not be sanitized.
:param df: DataFrame to load into a Hive table
:param table: target Hive table, use dot notation to target a
specific database
:param field_dict: mapping from column name to hive data type.
Note that it must be OrderedDict so as to keep columns' order.
:param delimiter: field delimiter in the file
:param encoding: str encoding to use when writing DataFrame to file
:param pandas_kwargs: passed to DataFrame.to_csv
:param kwargs: passed to self.load_file
.. py:method:: load_file(filepath, table, delimiter = ',', field_dict = None, create = True, overwrite = True, partition = None, recreate = False, tblproperties = None)
Loads a local file into Hive
Note that the table generated in Hive uses ``STORED AS textfile``
which isn't the most efficient serialization format. If a
large amount of data is loaded and/or if the tables gets
queried considerably, you may want to use this operator only to
stage the data into a temporary table before loading it into its
final destination using a ``HiveOperator``.
:param filepath: local filepath of the file to load
:param table: target Hive table, use dot notation to target a
specific database
:param delimiter: field delimiter in the file
:param field_dict: A dictionary of the fields name in the file
as keys and their Hive types as values.
Note that it must be OrderedDict so as to keep columns' order.
:param create: whether to create the table if it doesn't exist
:param overwrite: whether to overwrite the data in table or partition
:param partition: target partition as a dict of partition columns
and values
:param recreate: whether to drop and recreate the table at every
:param tblproperties: TBLPROPERTIES of the hive table being created
.. py:method:: kill()
Kill Hive cli command
.. py:class:: HiveMetastoreHook(metastore_conn_id = default_conn_name)
Bases: :py:obj:`airflow.hooks.base.BaseHook`
Wrapper to interact with the Hive Metastore
:param metastore_conn_id: reference to the
:ref: `metastore thrift service connection id <howto/connection:hive_metastore>`.
.. py:attribute:: MAX_PART_COUNT
:annotation: = 32767
.. py:attribute:: conn_name_attr
:annotation: = metastore_conn_id
.. py:attribute:: default_conn_name
:annotation: = metastore_default
.. py:attribute:: conn_type
:annotation: = hive_metastore
.. py:attribute:: hook_name
:annotation: = Hive Metastore Thrift
.. py:method:: __getstate__()
.. py:method:: __setstate__(d)
.. py:method:: get_metastore_client()
Returns a Hive thrift client.
.. py:method:: get_conn()
Returns connection for the hook.
.. py:method:: check_for_partition(schema, table, partition)
Checks whether a partition exists
:param schema: Name of hive schema (database) @table belongs to
:param table: Name of hive table @partition belongs to
:param partition: Expression that matches the partitions to check for
(eg `a = 'b' AND c = 'd'`)
:rtype: bool
>>> hh = HiveMetastoreHook()
>>> t = 'static_babynames_partitioned'
>>> hh.check_for_partition('airflow', t, "ds='2015-01-01'")
.. py:method:: check_for_named_partition(schema, table, partition_name)
Checks whether a partition with a given name exists
:param schema: Name of hive schema (database) @table belongs to
:param table: Name of hive table @partition belongs to
:param partition_name: Name of the partitions to check for (eg `a=b/c=d`)
:rtype: bool
>>> hh = HiveMetastoreHook()
>>> t = 'static_babynames_partitioned'
>>> hh.check_for_named_partition('airflow', t, "ds=2015-01-01")
>>> hh.check_for_named_partition('airflow', t, "ds=xxx")
.. py:method:: get_table(table_name, db = 'default')
Get a metastore table object
>>> hh = HiveMetastoreHook()
>>> t = hh.get_table(db='airflow', table_name='static_babynames')
>>> t.tableName
>>> [ for col in]
['state', 'year', 'name', 'gender', 'num']
.. py:method:: get_tables(db, pattern = '*')
Get a metastore table object
.. py:method:: get_databases(pattern = '*')
Get a metastore table object
.. py:method:: get_partitions(schema, table_name, partition_filter = None)
Returns a list of all partitions in a table. Works only
for tables with less than 32767 (java short max val).
For subpartitioned table, the number might easily exceed this.
>>> hh = HiveMetastoreHook()
>>> t = 'static_babynames_partitioned'
>>> parts = hh.get_partitions(schema='airflow', table_name=t)
>>> len(parts)
>>> parts
[{'ds': '2015-01-01'}]
.. py:method:: max_partition(schema, table_name, field = None, filter_map = None)
Returns the maximum value for all partitions with given field in a table.
If only one partition key exist in the table, the key will be used as field.
filter_map should be a partition_key:partition_value map and will be used to
filter out partitions.
:param schema: schema name.
:param table_name: table name.
:param field: partition key to get max partition from.
:param filter_map: partition_key:partition_value map used for partition filtering.
>>> hh = HiveMetastoreHook()
>>> filter_map = {'ds': '2015-01-01'}
>>> t = 'static_babynames_partitioned'
>>> hh.max_partition(schema='airflow', ... table_name=t, field='ds', filter_map=filter_map)
.. py:method:: table_exists(table_name, db = 'default')
Check if table exists
>>> hh = HiveMetastoreHook()
>>> hh.table_exists(db='airflow', table_name='static_babynames')
>>> hh.table_exists(db='airflow', table_name='does_not_exist')
.. py:method:: drop_partitions(table_name, part_vals, delete_data=False, db='default')
Drop partitions from the given table matching the part_vals input
:param table_name: table name.
:param part_vals: list of partition specs.
:param delete_data: Setting to control if underlying data have to deleted
in addition to dropping partitions.
:param db: Name of hive schema (database) @table belongs to
>>> hh = HiveMetastoreHook()
>>> hh.drop_partitions(db='airflow', table_name='static_babynames',
.. py:class:: HiveServer2Hook(*args, schema = None, log_sql = True, **kwargs)
Bases: :py:obj:`airflow.providers.common.sql.hooks.sql.DbApiHook`
Wrapper around the pyhive library
* the default auth_mechanism is PLAIN, to override it you
can specify it in the ``extra`` of your connection in the UI
* the default for run_set_variable_statements is true, if you
are using impala you may need to set it to false in the
``extra`` of your connection in the UI
:param hiveserver2_conn_id: Reference to the
:ref: `Hive Server2 thrift service connection id <howto/connection:hiveserver2>`.
:param schema: Hive database name.
.. py:attribute:: conn_name_attr
:annotation: = hiveserver2_conn_id
.. py:attribute:: default_conn_name
:annotation: = hiveserver2_default
.. py:attribute:: conn_type
:annotation: = hiveserver2
.. py:attribute:: hook_name
:annotation: = Hive Server 2 Thrift
.. py:attribute:: supports_autocommit
:annotation: = False
.. py:method:: get_conn(schema = None)
Returns a Hive connection object.
.. py:method:: get_results(sql, schema = 'default', fetch_size = None, hive_conf = None)
Get results of the provided hql in target schema.
:param sql: hql to be executed.
:param schema: target schema, default to 'default'.
:param fetch_size: max size of result to fetch.
:param hive_conf: hive_conf to execute alone with the hql.
:return: results of hql execution, dict with data (list of results) and header
:rtype: dict
.. py:method:: to_csv(sql, csv_filepath, schema = 'default', delimiter = ',', lineterminator = '\r\n', output_header = True, fetch_size = 1000, hive_conf = None)
Execute hql in target schema and write results to a csv file.
:param sql: hql to be executed.
:param csv_filepath: filepath of csv to write results into.
:param schema: target schema, default to 'default'.
:param delimiter: delimiter of the csv file, default to ','.
:param lineterminator: lineterminator of the csv file.
:param output_header: header of the csv file, default to True.
:param fetch_size: number of result rows to write into the csv file, default to 1000.
:param hive_conf: hive_conf to execute alone with the hql.
.. py:method:: get_records(sql, parameters = None, **kwargs)
Get a set of records from a Hive query. You can optionally pass 'schema' kwarg
which specifies target schema and default to 'default'.
:param sql: hql to be executed.
:param parameters: optional configuration passed to get_results
:return: result of hive execution
:rtype: list
>>> hh = HiveServer2Hook()
>>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100"
>>> len(hh.get_records(sql))
.. py:method:: get_pandas_df(sql, schema = 'default', hive_conf = None, **kwargs)
Get a pandas dataframe from a Hive query
:param sql: hql to be executed.
:param schema: target schema, default to 'default'.
:param hive_conf: hive_conf to execute alone with the hql.
:param kwargs: (optional) passed into pandas.DataFrame constructor
:return: result of hive execution
:rtype: DataFrame
>>> hh = HiveServer2Hook()
>>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100"
>>> df = hh.get_pandas_df(sql)
>>> len(df.index)
:return: pandas.DateFrame