title: “Apache Gravitino Flink connector” slug: /flink-connector/flink-connector keyword: flink connector federation query license: “This software is licensed under the Apache License version 2.”

Overview

The Apache Gravitino Flink connector implements the Catalog Store to manage the catalogs under Gravitino. This capability allows users to perform federation queries, accessing data from various catalogs through a unified interface and consistent access control.

Capabilities

  1. Supports Hive catalog
  2. Supports Iceberg catalog
  3. Supports Paimon catalog
  4. Supports Jdbc catalog
  5. Supports most DDL and DML SQLs.

Requirement

  • Flink 1.18
  • Scala 2.12
  • JDK 8, 11 or 17

How to use it

  1. Build or download the Gravitino flink connector runtime jar, and place it to the classpath of Flink.

  2. Configure the Flink configuration to use the Gravitino flink connector.

PropertyTypeDefault ValueDescriptionRequiredSince Version
table.catalog-store.kindstringgeneric_in_memoryThe Catalog Store name, it should set to gravitino.Yes0.6.0-incubating
table.catalog-store.gravitino.gravitino.metalakestring(none)The metalake name that flink connector used to request to Gravitino.Yes0.6.0-incubating
table.catalog-store.gravitino.gravitino.uristring(none)The uri of Gravitino server address.Yes0.6.0-incubating
table.catalog-store.gravitino.gravitino.client.string(none)The configuration key prefix for the Gravitino client config.No1.0.0

To configure the Gravitino client, use properties prefixed with table.catalog-store.gravitino.gravitino.client.. These properties will be passed to the Gravitino client after removing the table.catalog-store.gravitino. prefix.

Example: Setting table.catalog-store.gravitino.gravitino.client.socketTimeoutMs is equivalent to setting gravitino.client.socketTimeoutMs for the Gravitino client.

Note: Invalid configuration properties will result in exceptions. Please see Gravitino Java client configurations for more support client configuration.

Set the flink configuration in flink-conf.yaml.

table.catalog-store.kind: gravitino
table.catalog-store.gravitino.gravitino.metalake: metalake_demo
table.catalog-store.gravitino.gravitino.uri: http://localhost:8090
table.catalog-store.gravitino.gravitino.client.socketTimeoutMs: 60000
table.catalog-store.gravitino.gravitino.client.connectionTimeoutMs: 60000

Or you can set the flink configuration in the TableEnvironment.

final Configuration configuration = new Configuration();
configuration.setString("table.catalog-store.kind", "gravitino");
configuration.setString("table.catalog-store.gravitino.gravitino.metalake", "metalake_demo");
configuration.setString("table.catalog-store.gravitino.gravitino.uri", "http://localhost:8090");
configuration.setString("table.catalog-store.gravitino.gravitino.client.socketTimeoutMs", "60000");
configuration.setString("table.catalog-store.gravitino.gravitino.client.connectionTimeoutMs", "60000");
EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance().withConfiguration(configuration);
TableEnvironment tableEnv = TableEnvironment.create(builder.inBatchMode().build());
  1. Add necessary jar files to Flink's classpath.

To run Flink with Gravitino connector and then access the data source like Hive, you may need to put additional jars to Flink's classpath. You can refer to the Flink document for more information.

  1. Execute the Flink SQL query.

Suppose there is only one hive catalog with the name catalog_hive in the metalake metalake_demo.

// use hive catalog
USE CATALOG catalog_hive;
CREATE DATABASE db;
USE db;
SET 'execution.runtime-mode' = 'batch';
SET 'sql-client.execution.result-mode' = 'tableau';
CREATE TABLE hive_students (id INT, name STRING);
INSERT INTO hive_students VALUES (1, 'Alice'), (2, 'Bob');
SELECT * FROM hive_students;

Datatype mapping

Gravitino flink connector support the following datatype mapping between Flink and Gravitino.

Flink TypeGravitino TypeSince Version
arraylist0.6.0-incubating
bigintlong0.6.0-incubating
binaryfixed0.6.0-incubating
booleanboolean0.6.0-incubating
charchar0.6.0-incubating
datedate0.6.0-incubating
decimaldecimal0.6.0-incubating
doubledouble0.6.0-incubating
floatfloat0.6.0-incubating
integerinteger0.6.0-incubating
mapmap0.6.0-incubating
nullnull0.6.0-incubating
rowstruct0.6.0-incubating
smallintshort0.6.0-incubating
timetime0.6.0-incubating
timestamptimestamp without time zone0.6.0-incubating
timestamp without time zonetimestamp without time zone0.6.0-incubating
timestamp with time zonetimestamp with time zone0.6.0-incubating
timestamp with local time zonetimestamp with time zone0.6.0-incubating
timestamp_ltztimestamp with time zone0.6.0-incubating
tinyintbyte0.6.0-incubating
varbinarybinary0.6.0-incubating
varcharstring0.6.0-incubating