Introduce how to use the new feature function data source of version 1.1.0
linkis-datasource-client Client module, DataSourceRemoteClient for basic management of user data sources, and MetaDataRemoteClient for metadata query operations.
linkis-datasource-manager-server Data source management module, service name ps-data-source-manager. Perform basic management of data sources, and provide http interfaces such as adding, querying, modifying, and connection testing of external data sources. The rpc service is provided internally, which is convenient for the data element management module to call through rpc to query the necessary information needed to establish a connection to the database.
linkis-metedata-manager-server Data element management module, service name ps-metadatamanager. It provides the basic query function of the data metadata of the database, provides the http interface externally, and provides the rpc service internally, which is convenient for the data source management module to perform the connection test of the data source through the rpc call.
The functional structure diagram is as follows:
LinkisDataSourceRemoteClient interface
The functional structure diagram is as follows:
LinkisMetaDataRemoteClient interface
linkis-public-enhancements/linkis-datasource ├── linkis-datasource-client //client code ├── linkis-datasource-manager //Datasource management module │ ├── common //Data source management common module │ └── server //Data source management service module ├── linkis-metadata //Module existing in the old version, reserved ├── linkis-metadata-manager //Data Metadata Management Module │ ├── common //Data element management common module │ ├── server //Data element management service module │ └── service //Supported data sources │ ├── elasticsearch │ ├── hive │ ├── kafka │ └── mysql
/lib/linkis-public-enhancements/ ├── linkis-ps-data-source-manager ├── linkis-ps-metadatamanager │ └── service │ ├── elasticsearch │ ├── hive │ ├── kafka │ └── mysql
wds.linkis.server.mdm.service.lib.dir
controls the classpath loaded during reflection calls. The default value of the parameter is /lib/linkis-public-enhancements/linkis-ps-metadatamanager/service
See Tuning and Troubleshooting>Parameter List#datasourceConfiguration Parameters
1.Some data drivers require to be installed by user's self, because they are possibly not compatible with the Apache license
2.Extra data driver directory: ./lib/linkis-public-enhancements/linkis-ps-publicservice
3.Data drivers list
Driver Name | Driver Version | Download Link |
---|---|---|
db2 | db2jcc4 | https://www.ibm.com/support/pages/db2-jdbc-driver-versions-and-downloads |
dameng | DmJdbcDriver18 | https://download.dameng.com/eco/docs/JAVA_Mybatis_lib.zip |
mysql | 5.1.34 | https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.34/mysql-connector-java-5.1.34.jar |
kingbase | kingbase8 | http://maven.jeecg.org/nexus/content/repositories/jeecg/kingbase/kingbase8/8/kingbase8-8.jar |
greenplum | 5.1.4 | https://network.pivotal.io/products/vmware-tanzu-greenplum#/releases/985537/file_groups/5749 |
postgresql | 42.3.1 | https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.1/postgresql-42.3.1.jar |
sqlserver | sqlserver2000 | https://www.microsoft.com/en-us/download/details.aspx?id=11774 |
oracle | 11.2.0.3 | http://www.datanucleus.org/downloads/maven2/oracle/ojdbc6/11.2.0.3/ojdbc6-11.2.0.3.jar |
In the startup script of linkis, the two services related to the data source (ps-data-source-manager, ps-metadatamanager) will not be started by default. If you want to use the data source service, you can enable it in the following ways: Modify export ENABLE_METADATA_MANAGER=true
in $LINKIS_CONF_DIR/linkis-env.sh
to true. When the service is started and stopped through linkis-start-all.sh/linkis-stop-all.sh, the data source service will be started and stopped.
Check whether the service starts normally through the eureka page
:::caution note
The use of data sources is divided into three steps:
You can only create configuration data sources, and test whether the data sources can be connected normally, and cannot directly query metadata
Implement a JDBC generic module, and then choose any item mentioned below on the web UI.
Data Source | Link |
---|---|
mysql | https://www.mysql.com |
oracle | https://www.oracle.com/database/technologies |
kingbase | https://www.kingbase.com.cn |
postgresql | https://www.postgresql.org |
sqlserver | https://www.microsoft.com/en-us/sql-server |
db2 | https://www.ibm.com/products/db2/database |
greenplum | https://greenplum.org |
dm | https://dmdatabases.com |
doris | https://doris.apache.org |
clickhouse | https://clickhouse.com |
Take MySQL as an example:
Data Source Management > New Data Source > Select MySQL Type
Enter relevant configuration information
After the entry is successful, you can pass the connection test to verify whether the connection can be made normally
:::caution note
Publishing of the configuration (using that configuration for the connection to the data source):
Click on the version and then pop up the page to select the appropriate configuration to publish
scala code example:
package org.apache.linkis.datasource.client import java.util import java.util.concurrent.TimeUnit import org.apache.linkis.common.utils.JsonUtils import org.apache.linkis.datasource.client.impl.{LinkisDataSourceRemoteClient, LinkisMetaDataRemoteClient} import org.apache.linkis.datasource.client.request._ import org.apache.linkis.datasource.client.response._ import org.apache.linkis.datasourcemanager.common.domain.DataSource import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder import org.junit.jupiter.api.{Disabled, Test} object TestMysqlClient { val gatewayUrl = "http://127.0.0.1:9001" val clientConfig = DWSClientConfigBuilder.newBuilder .addServerUrl(gatewayUrl) .connectionTimeout(30000) .discoveryEnabled(false) .discoveryFrequency(1, TimeUnit.MINUTES) .loadbalancerEnabled(true) .maxConnectionSize(1) .retryEnabled(false) .readTimeout(30000) .setAuthenticationStrategy(new StaticAuthenticationStrategy) .setAuthTokenKey("hadoop") .setAuthTokenValue("xxxxx") .setDWSVersion("v1") val dataSourceclient = new LinkisDataSourceRemoteClient(clientConfig.build()) val clientConfig2 = DWSClientConfigBuilder.newBuilder .addServerUrl(gatewayUrl) .connectionTimeout(30000) .discoveryEnabled(false) .discoveryFrequency(1, TimeUnit.MINUTES) .loadbalancerEnabled(true) .maxConnectionSize(1) .retryEnabled(false) .readTimeout(30000) .setAuthenticationStrategy(new StaticAuthenticationStrategy) .setAuthTokenKey("hadoop") .setAuthTokenValue("xxxxx") .setDWSVersion("v1") val metaDataClient = new LinkisMetaDataRemoteClient(clientConfig2.build()) @Test @Disabled def testCreateDataSourceMysql: Unit = { val user = "hadoop" val system = "Linkis" //create data source val dataSource = new DataSource(); val dataSourceName = "for-mysql-test" dataSource.setDataSourceName(dataSourceName) dataSource.setDataSourceDesc("this is for mysql test") dataSource.setCreateSystem(system) dataSource.setDataSourceTypeId(1L) val map = JsonUtils.jackson.readValue(JsonUtils.jackson.writeValueAsString(dataSource), new util.HashMap[String, Any]().getClass) val createDataSourceAction: CreateDataSourceAction = CreateDataSourceAction.builder() .setUser(user) .addRequestPayloads(map) .build() val createDataSourceResult: CreateDataSourceResult = dataSourceclient.createDataSource(createDataSourceAction) val dataSourceId = createDataSourceResult.getInsertId // set connection parameters val params = new util.HashMap[String, Any] val connectParams = new util.HashMap[String, Any] connectParams.put("host", "127.0.0.1") connectParams.put("port", "36000") connectParams.put("username", "db username") connectParams.put("password", "db password") params.put("connectParams", connectParams) params.put("comment", "init") val updateParameterAction: UpdateDataSourceParameterAction = UpdateDataSourceParameterAction.builder() .setUser(user) .setDataSourceId(dataSourceId) .addRequestPayloads(params) .build() val updateParameterResult: UpdateDataSourceParameterResult = dataSourceclient.updateDataSourceParameter(updateParameterAction) val version: Long = updateParameterResult.getVersion //publish configuration version dataSourceclient.publishDataSourceVersion( PublishDataSourceVersionAction.builder() .setDataSourceId(dataSourceId) .setUser(user) .setVersion(version) .build()) // use example val metadataGetDatabasesAction: MetadataGetDatabasesAction = MetadataGetDatabasesAction.builder() .setUser(user) .setDataSourceName(dataSourceName) .setSystem(system) .build() val metadataGetDatabasesResult: MetadataGetDatabasesResult = metaDataClient.getDatabases(metadataGetDatabasesAction) val metadataGetTablesAction: MetadataGetTablesAction = MetadataGetTablesAction.builder() .setUser(user) .setDataSourceName(dataSourceName) .setDatabase("linkis") .setSystem(system) .build() val metadataGetTablesResult: MetadataGetTablesResult = metaDataClient.getTables(metadataGetTablesAction) val metadataGetColumnsAction = MetadataGetColumnsAction.builder() .setUser(user) .setDataSourceName(dataSourceName) .setDatabase("linkis") .setSystem(system) .setTable("linkis_datasource") .build() val metadataGetColumnsResult: MetadataGetColumnsResult = metaDataClient.getColumns(metadataGetColumnsAction) } }
You can only create configuration data sources, and test whether the data sources can be connected normally, and cannot directly query metadata
First need to configure the cluster environment information Table linkis_ps_dm_datasource_env
INSERT INTO `linkis_ps_dm_datasource_env` (`env_name`, `env_desc`, `datasource_type_id`, `parameter`, `create_user`, `modify_user`) VALUES ('testEnv', 'Test Environment', 4, '{\r\n "uris": "thrift://clustername:9083",\r\n "keytab": "4dd408ad-a2f9-4501-83b3-139290977ca2",\r\n "principle": "hadoop @WEBANK.COM",\r\n "hadoopConf":{"hive.metastore.execute.setugi":"true"}\r\n}', 'user','user');
The primary key id is used as the envId. When establishing a connection, you need to use this envId parameter to obtain information about the cluster configuration. Explanation of configuration fields:
{ "uris": "thrift://clustername:9083", # Mandatory If kerberos authentication is not enabled, the following [keytab][principle] parameters can be empty "keytab": "bml resource id", //keytab stores the resourceId in the material library, and currently needs to be manually uploaded through the http interface. "principle": "hadoop@WEBANK.COM" //Authentication principle "hadoopConf":{} //Additional connection parameters are optional }
The resourceId acquisition method of keytab, the basic data management function is still under planning, and can be obtained through the http interface request reference example
curl --form "file=@file path" \ --form system=subsystem name \ -H "Token-Code: authentication token" \ -H "Token-User: authentication user name" \ http://linkis-gatewayip:port/api/rest_j/v1/bml/upload Example: curl --form "file=@/appcom/keytab/hadoop.keytab" \ --form system=ABCD \ -H "Token-Code:QML-AUTH" \ -H "Token-User:hadoop" \ http://127.0.0.1:9001/api/rest_j/v1/bml/upload The resourceId in the request result is the corresponding `bml resource id` value {"method":"/bml/upload","status":0,"message":"The task of submitting and uploading resources was successful","data":{"resourceId": "6e4e54fc-cc97-4d0d-8d5e-a311129ec84e","version":"v000001","taskId":35}}
Create on the web:
package org.apache.linkis.datasource.client import java.util import java.util.concurrent.TimeUnit import org.apache.linkis.common.utils.JsonUtils import org.apache.linkis.datasource.client.impl.{LinkisDataSourceRemoteClient, LinkisMetaDataRemoteClient} import org.apache.linkis.datasource.client.request._ import org.apache.linkis.datasource.client.response._ import org.apache.linkis.datasourcemanager.common.domain.DataSource import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder import org.junit.jupiter.api.{Disabled, Test} object TestHiveClient { val gatewayUrl = "http://127.0.0.1:9001" val clientConfig = DWSClientConfigBuilder.newBuilder .addServerUrl(gatewayUrl) .connectionTimeout(30000) .discoveryEnabled(false) .discoveryFrequency(1, TimeUnit.MINUTES) .loadbalancerEnabled(true) .maxConnectionSize(1) .retryEnabled(false) .readTimeout(30000) .setAuthenticationStrategy(new StaticAuthenticationStrategy) .setAuthTokenKey("hadoop") .setAuthTokenValue("xxxxx") .setDWSVersion("v1") val dataSourceclient = new LinkisDataSourceRemoteClient(clientConfig.build()) val clientConfig2 = DWSClientConfigBuilder.newBuilder .addServerUrl(gatewayUrl) .connectionTimeout(30000) .discoveryEnabled(false) .discoveryFrequency(1, TimeUnit.MINUTES) .loadbalancerEnabled(true) .maxConnectionSize(1) .retryEnabled(false) .readTimeout(30000) .setAuthenticationStrategy(new StaticAuthenticationStrategy) .setAuthTokenKey("hadoop") .setAuthTokenValue("xxxxx") .setDWSVersion("v1") val metaDataClient = new LinkisMetaDataRemoteClient(clientConfig2.build()) @Test @Disabled def testCreateDataSourceMysql: Unit = { val user = "hadoop" val system = "Linkis" //create data source val dataSource = new DataSource(); val dataSourceName = "for-hive-test" dataSource.setDataSourceName(dataSourceName) dataSource.setDataSourceDesc("this is for hive test") dataSource.setCreateSystem(system) dataSource.setDataSourceTypeId(4L) val map = JsonUtils.jackson.readValue(JsonUtils.jackson.writeValueAsString(dataSource), new util.HashMap[String, Any]().getClass) val createDataSourceAction: CreateDataSourceAction = CreateDataSourceAction.builder() .setUser(user) .addRequestPayloads(map) .build() val createDataSourceResult: CreateDataSourceResult = dataSourceclient.createDataSource(createDataSourceAction) val dataSourceId = createDataSourceResult.getInsertId // set connection parameters val params = new util.HashMap[String, Any] val connectParams = new util.HashMap[String, Any] connectParams.put("envId", "3") params.put("connectParams", connectParams) params.put("comment", "init") val updateParameterAction: UpdateDataSourceParameterAction = UpdateDataSourceParameterAction.builder() .setUser(user) .setDataSourceId(dataSourceId) .addRequestPayloads(params) .build() val updateParameterResult: UpdateDataSourceParameterResult = dataSourceclient.updateDataSourceParameter(updateParameterAction) val version: Long = updateParameterResult.getVersion //publish configuration version dataSourceclient.publishDataSourceVersion( PublishDataSourceVersionAction.builder() .setDataSourceId(dataSourceId) .setUser(user) .setVersion(version) .build()) // use example val metadataGetDatabasesAction: MetadataGetDatabasesAction = MetadataGetDatabasesAction.builder() .setUser(user) .setDataSourceName(dataSourceName) .setSystem(system) .build() val metadataGetDatabasesResult: MetadataGetDatabasesResult = metaDataClient.getDatabases(metadataGetDatabasesAction) val metadataGetTablesAction: MetadataGetTablesAction = MetadataGetTablesAction.builder() .setUser(user) .setDataSourceName(dataSourceName) .setDatabase("linkis_test_ind") .setSystem(system) .build() val metadataGetTablesResult: MetadataGetTablesResult = metaDataClient.getTables(metadataGetTablesAction) val metadataGetColumnsAction = MetadataGetColumnsAction.builder() .setUser(user) .setDataSourceName(dataSourceName) .setDatabase("linkis_test_ind") .setSystem(system) .setTable("test") .build() val metadataGetColumnsResult: MetadataGetColumnsResult = metaDataClient.getColumns(metadataGetColumnsAction) } }