title: DataSource Client SDK sidebar_position: 6

Linkis DataSource provides a convenient interface for JAVA and SCALA calls, which can be used only by introducing the module of linkis-datasource-client

1. import dependent modules

 <dependency>
   <groupId>org.apache.linkis</groupId>
   <artifactId>linkis-datasource-client</artifactId>
   <version>${linkis.version}</version>
 </dependency>
如:
 <dependency>
   <groupId>org.apache.linkis</groupId>
   <artifactId>linkis-datasource-client</artifactId>
   <version>1.1.0</version>
 </dependency>

2. Scala Test Code

Create a Scala test class LinkisDataSourceClientTest, the specific interface meaning can be seen in the comments:


import com.fasterxml.jackson.databind.ObjectMapper import org.apache.linkis.common.utils.JsonUtils import org.apache.linkis.datasource.client.impl.{LinkisDataSourceRemoteClient, LinkisMetaDataRemoteClient} import org.apache.linkis.datasource.client.request.{CreateDataSourceAction, GetAllDataSourceTypesAction, MetadataGetDatabasesAction, UpdateDataSourceParameterAction} import org.apache.linkis.datasourcemanager.common.domain.{DataSource, DataSourceType} import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy import org.apache.linkis.httpclient.dws.config.{DWSClientConfig, DWSClientConfigBuilder} import java.io.StringWriter import java.util import java.util.concurrent.TimeUnit object LinkisDataSourceClientTest { def main(args: Array[String]): Unit = { val clientConfig =DWSClientConfigBuilder.newBuilder .addServerUrl("http://127.0.0.1:9001") //set linkis-mg-gateway url: http://{ip}:{port} .connectionTimeout(30000) //connection timtout .discoveryEnabled(false) //disable discovery .discoveryFrequency(1, TimeUnit.MINUTES) // discovery frequency .loadbalancerEnabled(true) // enable loadbalance .maxConnectionSize(5) // set max Connection .retryEnabled(false) // set retry .readTimeout(30000) //set read timeout .setAuthenticationStrategy(new StaticAuthenticationStrategy) //AuthenticationStrategy Linkis authen suppory static and Token .setAuthTokenKey("hadoop") // set submit user .setAuthTokenValue("xxx") // set passwd or token .setDWSVersion("v1") //linkis rest version v1 .build //init datasource remote client val dataSourceClient = new LinkisDataSourceRemoteClient(clientConfig) //init metadata remote client val metaDataClient = new LinkisMetaDataRemoteClient(clientConfig) //get all datasource type testGetAllDataSourceTypes(dataSourceClient) //create kafka datasource testCreateDataSourceForKafka(dataSourceClient) //create es datasource testCreateDataSourceForEs(dataSourceClient) //update datasource parameter for kafka testUpdateDataSourceParameterForKafka(dataSourceClient) //update datasource parameter for es testUpdateDataSourceParameterForEs(dataSourceClient) //get hive metadata database list testMetadataGetDatabases(metaDataClient) } def testGetAllDataSourceTypes(client:LinkisDataSourceRemoteClient): Unit ={ val getAllDataSourceTypesResult = client.getAllDataSourceTypes(GetAllDataSourceTypesAction.builder().setUser("hadoop").build()).getAllDataSourceType System.out.println(getAllDataSourceTypesResult) } def testCreateDataSourceForKafka(client:LinkisDataSourceRemoteClient): Unit ={ val dataSource = new DataSource(); val dataSourceType = new DataSourceType dataSourceType.setName("kafka") dataSourceType.setId("2") dataSourceType.setLayers(2) dataSourceType.setClassifier("message queue") dataSourceType.setDescription("kafka") dataSource.setDataSourceType(dataSourceType) dataSource.setDataSourceName("kafka-test") dataSource.setCreateSystem("client") dataSource.setDataSourceTypeId(2l); val dsJsonWriter = new StringWriter val mapper = new ObjectMapper JsonUtils.jackson.writeValue(dsJsonWriter, dataSource) val map = mapper.readValue(dsJsonWriter.toString,new util.HashMap[String,Any]().getClass) val id = client.createDataSource(CreateDataSourceAction.builder().setUser("hadoop").addRequestPayloads(map).build()).getInsert_id System.out.println(id) } def testCreateDataSourceForEs(client:LinkisDataSourceRemoteClient): Unit ={ val dataSource = new DataSource(); dataSource.setDataSourceName("es-test") dataSource.setCreateSystem("client") dataSource.setDataSourceTypeId(7l); val dsJsonWriter = new StringWriter val mapper = new ObjectMapper JsonUtils.jackson.writeValue(dsJsonWriter, dataSource) val map = mapper.readValue(dsJsonWriter.toString,new util.HashMap[String,Any]().getClass) val id = client.createDataSource(CreateDataSourceAction.builder().setUser("hadoop").addRequestPayloads(map).build()).getInsert_id System.out.println(id) } def testUpdateDataSourceParameterForKafka(client:LinkisDataSourceRemoteClient): Unit ={ val params = new util.HashMap[String,Any]() val connParams = new util.HashMap[String,Any]() connParams.put("brokers","127.0.0.1:9092") params.put("connectParams",connParams) params.put("comment","kafka data source") client.updateDataSourceParameter(UpdateDataSourceParameterAction.builder().setUser("hadoop").setDataSourceId("7").addRequestPayloads(params).build()) } def testUpdateDataSourceParameterForEs(client:LinkisDataSourceRemoteClient): Unit ={ val params = new util.HashMap[String,Any]() val connParams = new util.HashMap[String,Any]() val elasticUrls = new util.ArrayList[String]() elasticUrls.add("http://127.0.0.1:9200") connParams.put("elasticUrls",elasticUrls) params.put("connectParams",connParams) params.put("comment","es data source") client.updateDataSourceParameter(UpdateDataSourceParameterAction.builder().setUser("hadoop").setDataSourceId("8").addRequestPayloads(params).build()) } def testMetadataGetDatabases(client:LinkisMetaDataRemoteClient): Unit ={ client.getDatabases(MetadataGetDatabasesAction.builder().setUser("hadoop").setDataSourceId(9l).setUser("hadoop").setSystem("client").build()).getDbs } }