| :py:mod:`airflow.providers.apache.hive.hooks.hive` |
| ================================================== |
| |
| .. py:module:: airflow.providers.apache.hive.hooks.hive |
| |
| |
| Module Contents |
| --------------- |
| |
| Classes |
| ~~~~~~~ |
| |
| .. autoapisummary:: |
| |
| airflow.providers.apache.hive.hooks.hive.HiveCliHook |
| airflow.providers.apache.hive.hooks.hive.HiveMetastoreHook |
| airflow.providers.apache.hive.hooks.hive.HiveServer2Hook |
| |
| |
| |
| Functions |
| ~~~~~~~~~ |
| |
| .. autoapisummary:: |
| |
| airflow.providers.apache.hive.hooks.hive.get_context_from_env_var |
| |
| |
| |
| Attributes |
| ~~~~~~~~~~ |
| |
| .. autoapisummary:: |
| |
| airflow.providers.apache.hive.hooks.hive.HIVE_QUEUE_PRIORITIES |
| |
| |
| .. py:data:: HIVE_QUEUE_PRIORITIES |
| :annotation: = ['VERY_HIGH', 'HIGH', 'NORMAL', 'LOW', 'VERY_LOW'] |
| |
| |
| |
| .. py:function:: get_context_from_env_var() |
| |
| Extract context from env variable, e.g. dag_id, task_id and execution_date, |
| so that they can be used inside BashOperator and PythonOperator. |
| |
| :return: The context of interest. |
| |
| |
| .. py:class:: HiveCliHook(hive_cli_conn_id = default_conn_name, run_as = None, mapred_queue = None, mapred_queue_priority = None, mapred_job_name = None) |
| |
| Bases: :py:obj:`airflow.hooks.base.BaseHook` |
| |
| Simple wrapper around the hive CLI. |
| |
| It also supports the ``beeline`` |
| a lighter CLI that runs JDBC and is replacing the heavier |
| traditional CLI. To enable ``beeline``, set the use_beeline param in the |
| extra field of your connection as in ``{ "use_beeline": true }`` |
| |
| Note that you can also set default hive CLI parameters using the |
| ``hive_cli_params`` to be used in your connection as in |
| ``{"hive_cli_params": "-hiveconf mapred.job.tracker=some.jobtracker:444"}`` |
| Parameters passed here can be overridden by run_cli's hive_conf param |
| |
| The extra connection parameter ``auth`` gets passed as in the ``jdbc`` |
| connection string as is. |
| |
| :param hive_cli_conn_id: Reference to the |
| :ref:`Hive CLI connection id <howto/connection:hive_cli>`. |
| :param mapred_queue: queue used by the Hadoop Scheduler (Capacity or Fair) |
| :param mapred_queue_priority: priority within the job queue. |
| Possible settings include: VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW |
| :param mapred_job_name: This name will appear in the jobtracker. |
| This can make monitoring easier. |
| |
| .. py:attribute:: conn_name_attr |
| :annotation: = hive_cli_conn_id |
| |
| |
| |
| .. py:attribute:: default_conn_name |
| :annotation: = hive_cli_default |
| |
| |
| |
| .. py:attribute:: conn_type |
| :annotation: = hive_cli |
| |
| |
| |
| .. py:attribute:: hook_name |
| :annotation: = Hive Client Wrapper |
| |
| |
| |
| .. py:method:: run_cli(self, hql, schema = None, verbose = True, hive_conf = None) |
| |
| Run an hql statement using the hive cli. If hive_conf is specified |
| it should be a dict and the entries will be set as key/value pairs |
| in HiveConf. |
| |
| :param hql: an hql (hive query language) statement to run with hive cli |
| :param schema: Name of hive schema (database) to use |
| :param verbose: Provides additional logging. Defaults to True. |
| :param hive_conf: if specified these key value pairs will be passed |
| to hive as ``-hiveconf "key"="value"``. Note that they will be |
| passed after the ``hive_cli_params`` and thus will override |
| whatever values are specified in the database. |
| |
| >>> hh = HiveCliHook() |
| >>> result = hh.run_cli("USE airflow;") |
| >>> ("OK" in result) |
| True |
| |
| |
| .. py:method:: test_hql(self, hql) |
| |
| Test an hql statement using the hive cli and EXPLAIN |
| |
| |
| .. py:method:: load_df(self, df, table, field_dict = None, delimiter = ',', encoding = 'utf8', pandas_kwargs = None, **kwargs) |
| |
| Loads a pandas DataFrame into hive. |
| |
| Hive data types will be inferred if not passed but column names will |
| not be sanitized. |
| |
| :param df: DataFrame to load into a Hive table |
| :param table: target Hive table, use dot notation to target a |
| specific database |
| :param field_dict: mapping from column name to hive data type. |
| Note that it must be OrderedDict so as to keep columns' order. |
| :param delimiter: field delimiter in the file |
| :param encoding: str encoding to use when writing DataFrame to file |
| :param pandas_kwargs: passed to DataFrame.to_csv |
| :param kwargs: passed to self.load_file |
| |
| |
| .. py:method:: load_file(self, filepath, table, delimiter = ',', field_dict = None, create = True, overwrite = True, partition = None, recreate = False, tblproperties = None) |
| |
| Loads a local file into Hive |
| |
| Note that the table generated in Hive uses ``STORED AS textfile`` |
| which isn't the most efficient serialization format. If a |
| large amount of data is loaded and/or if the tables gets |
| queried considerably, you may want to use this operator only to |
| stage the data into a temporary table before loading it into its |
| final destination using a ``HiveOperator``. |
| |
| :param filepath: local filepath of the file to load |
| :param table: target Hive table, use dot notation to target a |
| specific database |
| :param delimiter: field delimiter in the file |
| :param field_dict: A dictionary of the fields name in the file |
| as keys and their Hive types as values. |
| Note that it must be OrderedDict so as to keep columns' order. |
| :param create: whether to create the table if it doesn't exist |
| :param overwrite: whether to overwrite the data in table or partition |
| :param partition: target partition as a dict of partition columns |
| and values |
| :param recreate: whether to drop and recreate the table at every |
| execution |
| :param tblproperties: TBLPROPERTIES of the hive table being created |
| |
| |
| .. py:method:: kill(self) |
| |
| Kill Hive cli command |
| |
| |
| |
| .. py:class:: HiveMetastoreHook(metastore_conn_id = default_conn_name) |
| |
| Bases: :py:obj:`airflow.hooks.base.BaseHook` |
| |
| Wrapper to interact with the Hive Metastore |
| |
| :param metastore_conn_id: reference to the |
| :ref: `metastore thrift service connection id <howto/connection:hive_metastore>`. |
| |
| .. py:attribute:: MAX_PART_COUNT |
| :annotation: = 32767 |
| |
| |
| |
| .. py:attribute:: conn_name_attr |
| :annotation: = metastore_conn_id |
| |
| |
| |
| .. py:attribute:: default_conn_name |
| :annotation: = metastore_default |
| |
| |
| |
| .. py:attribute:: conn_type |
| :annotation: = hive_metastore |
| |
| |
| |
| .. py:attribute:: hook_name |
| :annotation: = Hive Metastore Thrift |
| |
| |
| |
| .. py:method:: __getstate__(self) |
| |
| |
| .. py:method:: __setstate__(self, d) |
| |
| |
| .. py:method:: get_metastore_client(self) |
| |
| Returns a Hive thrift client. |
| |
| |
| .. py:method:: get_conn(self) |
| |
| Returns connection for the hook. |
| |
| |
| .. py:method:: check_for_partition(self, schema, table, partition) |
| |
| Checks whether a partition exists |
| |
| :param schema: Name of hive schema (database) @table belongs to |
| :param table: Name of hive table @partition belongs to |
| :param partition: Expression that matches the partitions to check for |
| (eg `a = 'b' AND c = 'd'`) |
| :rtype: bool |
| |
| >>> hh = HiveMetastoreHook() |
| >>> t = 'static_babynames_partitioned' |
| >>> hh.check_for_partition('airflow', t, "ds='2015-01-01'") |
| True |
| |
| |
| .. py:method:: check_for_named_partition(self, schema, table, partition_name) |
| |
| Checks whether a partition with a given name exists |
| |
| :param schema: Name of hive schema (database) @table belongs to |
| :param table: Name of hive table @partition belongs to |
| :param partition_name: Name of the partitions to check for (eg `a=b/c=d`) |
| :rtype: bool |
| |
| >>> hh = HiveMetastoreHook() |
| >>> t = 'static_babynames_partitioned' |
| >>> hh.check_for_named_partition('airflow', t, "ds=2015-01-01") |
| True |
| >>> hh.check_for_named_partition('airflow', t, "ds=xxx") |
| False |
| |
| |
| .. py:method:: get_table(self, table_name, db = 'default') |
| |
| Get a metastore table object |
| |
| >>> hh = HiveMetastoreHook() |
| >>> t = hh.get_table(db='airflow', table_name='static_babynames') |
| >>> t.tableName |
| 'static_babynames' |
| >>> [col.name for col in t.sd.cols] |
| ['state', 'year', 'name', 'gender', 'num'] |
| |
| |
| .. py:method:: get_tables(self, db, pattern = '*') |
| |
| Get a metastore table object |
| |
| |
| .. py:method:: get_databases(self, pattern = '*') |
| |
| Get a metastore table object |
| |
| |
| .. py:method:: get_partitions(self, schema, table_name, partition_filter = None) |
| |
| Returns a list of all partitions in a table. Works only |
| for tables with less than 32767 (java short max val). |
| For subpartitioned table, the number might easily exceed this. |
| |
| >>> hh = HiveMetastoreHook() |
| >>> t = 'static_babynames_partitioned' |
| >>> parts = hh.get_partitions(schema='airflow', table_name=t) |
| >>> len(parts) |
| 1 |
| >>> parts |
| [{'ds': '2015-01-01'}] |
| |
| |
| .. py:method:: max_partition(self, schema, table_name, field = None, filter_map = None) |
| |
| Returns the maximum value for all partitions with given field in a table. |
| If only one partition key exist in the table, the key will be used as field. |
| filter_map should be a partition_key:partition_value map and will be used to |
| filter out partitions. |
| |
| :param schema: schema name. |
| :param table_name: table name. |
| :param field: partition key to get max partition from. |
| :param filter_map: partition_key:partition_value map used for partition filtering. |
| |
| >>> hh = HiveMetastoreHook() |
| >>> filter_map = {'ds': '2015-01-01'} |
| >>> t = 'static_babynames_partitioned' |
| >>> hh.max_partition(schema='airflow', ... table_name=t, field='ds', filter_map=filter_map) |
| '2015-01-01' |
| |
| |
| .. py:method:: table_exists(self, table_name, db = 'default') |
| |
| Check if table exists |
| |
| >>> hh = HiveMetastoreHook() |
| >>> hh.table_exists(db='airflow', table_name='static_babynames') |
| True |
| >>> hh.table_exists(db='airflow', table_name='does_not_exist') |
| False |
| |
| |
| .. py:method:: drop_partitions(self, table_name, part_vals, delete_data=False, db='default') |
| |
| Drop partitions from the given table matching the part_vals input |
| |
| :param table_name: table name. |
| :param part_vals: list of partition specs. |
| :param delete_data: Setting to control if underlying data have to deleted |
| in addition to dropping partitions. |
| :param db: Name of hive schema (database) @table belongs to |
| |
| >>> hh = HiveMetastoreHook() |
| >>> hh.drop_partitions(db='airflow', table_name='static_babynames', |
| part_vals="['2020-05-01']") |
| True |
| |
| |
| |
| .. py:class:: HiveServer2Hook(*args, schema = None, **kwargs) |
| |
| Bases: :py:obj:`airflow.hooks.dbapi.DbApiHook` |
| |
| Wrapper around the pyhive library |
| |
| Notes: |
| * the default authMechanism is PLAIN, to override it you |
| can specify it in the ``extra`` of your connection in the UI |
| * the default for run_set_variable_statements is true, if you |
| are using impala you may need to set it to false in the |
| ``extra`` of your connection in the UI |
| |
| :param hiveserver2_conn_id: Reference to the |
| :ref: `Hive Server2 thrift service connection id <howto/connection:hiveserver2>`. |
| :param schema: Hive database name. |
| |
| .. py:attribute:: conn_name_attr |
| :annotation: = hiveserver2_conn_id |
| |
| |
| |
| .. py:attribute:: default_conn_name |
| :annotation: = hiveserver2_default |
| |
| |
| |
| .. py:attribute:: conn_type |
| :annotation: = hiveserver2 |
| |
| |
| |
| .. py:attribute:: hook_name |
| :annotation: = Hive Server 2 Thrift |
| |
| |
| |
| .. py:attribute:: supports_autocommit |
| :annotation: = False |
| |
| |
| |
| .. py:method:: get_conn(self, schema = None) |
| |
| Returns a Hive connection object. |
| |
| |
| .. py:method:: get_results(self, hql, schema = 'default', fetch_size = None, hive_conf = None) |
| |
| Get results of the provided hql in target schema. |
| |
| :param hql: hql to be executed. |
| :param schema: target schema, default to 'default'. |
| :param fetch_size: max size of result to fetch. |
| :param hive_conf: hive_conf to execute alone with the hql. |
| :return: results of hql execution, dict with data (list of results) and header |
| :rtype: dict |
| |
| |
| .. py:method:: to_csv(self, hql, csv_filepath, schema = 'default', delimiter = ',', lineterminator = '\r\n', output_header = True, fetch_size = 1000, hive_conf = None) |
| |
| Execute hql in target schema and write results to a csv file. |
| |
| :param hql: hql to be executed. |
| :param csv_filepath: filepath of csv to write results into. |
| :param schema: target schema, default to 'default'. |
| :param delimiter: delimiter of the csv file, default to ','. |
| :param lineterminator: lineterminator of the csv file. |
| :param output_header: header of the csv file, default to True. |
| :param fetch_size: number of result rows to write into the csv file, default to 1000. |
| :param hive_conf: hive_conf to execute alone with the hql. |
| |
| |
| |
| .. py:method:: get_records(self, hql, schema = 'default', hive_conf = None) |
| |
| Get a set of records from a Hive query. |
| |
| :param hql: hql to be executed. |
| :param schema: target schema, default to 'default'. |
| :param hive_conf: hive_conf to execute alone with the hql. |
| :return: result of hive execution |
| :rtype: list |
| |
| >>> hh = HiveServer2Hook() |
| >>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100" |
| >>> len(hh.get_records(sql)) |
| 100 |
| |
| |
| .. py:method:: get_pandas_df(self, hql, schema = 'default', hive_conf = None, **kwargs) |
| |
| Get a pandas dataframe from a Hive query |
| |
| :param hql: hql to be executed. |
| :param schema: target schema, default to 'default'. |
| :param hive_conf: hive_conf to execute alone with the hql. |
| :param kwargs: (optional) passed into pandas.DataFrame constructor |
| :return: result of hive execution |
| :rtype: DataFrame |
| |
| >>> hh = HiveServer2Hook() |
| >>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100" |
| >>> df = hh.get_pandas_df(sql) |
| >>> len(df.index) |
| 100 |
| |
| :return: pandas.DateFrame |
| |
| |
| |