Linkis provides a Java client implementation for users to have quick-access to Linkis background services using UJESClient.
We provided two test classes under dir ujes/client/src/test:
com.webank.wedatasphere.linkis.ujes.client.UJESClientImplTestJ # Test class based on Java com.webank.wedatasphere.linkis.ujes.client.UJESClientImplTest # Test class based on Scala
If you have cloned the source code of Linkis, you can directly run these two test classes.
Below sections introduce about how to write the code to complete a single execution on Linkis
<dependency> <groupId>com.webank.wedatasphere.Linkis</groupId> <artifactId>Linkis-ujes-client</artifactId> <version>0.10.0</version> </dependency>
package com.webank.bdp.dataworkcloud.ujes.client; import com.webank.wedatasphere.Linkis.common.utils.Utils; import com.webank.wedatasphere.Linkis.httpclient.dws.authentication.StaticAuthenticationStrategy; import com.webank.wedatasphere.Linkis.httpclient.dws.config.DWSClientConfig; import com.webank.wedatasphere.Linkis.httpclient.dws.config.DWSClientConfigBuilder; import com.webank.wedatasphere.Linkis.ujes.client.UJESClient; import com.webank.wedatasphere.Linkis.ujes.client.UJESClientImpl; import com.webank.wedatasphere.Linkis.ujes.client.request.JobExecuteAction; import com.webank.wedatasphere.Linkis.ujes.client.request.ResultSetAction; import com.webank.wedatasphere.Linkis.ujes.client.response.JobExecuteResult; import com.webank.wedatasphere.Linkis.ujes.client.response.JobInfoResult; import com.webank.wedatasphere.Linkis.ujes.client.response.JobProgressResult; import com.webank.wedatasphere.Linkis.ujes.client.response.JobStatusResult; import org.apache.commons.io.IOUtils; import java.util.concurrent.TimeUnit; public class UJESClientImplTestJ{ public static void main(String[] args){ // 1. To do the configuration, an instance of DWSClientConfig should be obtained from DWSClientBuilder. DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder() .addUJESServerUrl("http://${ip}:${port}") //Specify the ServerUrl,The address of Willink gateway, i.e. http://{ip}:{port} .connectionTimeout(30000) //connectionTimeOut: The connection timeout of the client .discoveryEnabled(true).discoveryFrequency(1, TimeUnit.MINUTES) //Enable service discovery. Once enabled, newly started Gateway will be auto-dicovered. .loadbalancerEnabled(true) // Enable load balancing. Cannot be enabled alone without service discovery enabled. .maxConnectionSize(5) //Max connection size, aka the max concurrent threshold .retryEnabled(false).readTimeout(30000) //whether to retry after failure .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy, The authentication strategy of Linkis .setAuthTokenKey("${username}").setAuthTokenValue("${password}"))) //The authentication key,usually the username;The authentication value,usually the password .setDWSVersion("v1").build(); //The version of Linkis background protocol, currently v1 // 2. Create a UJESClient from DWSClientConfig UJESClient client = new UJESClientImpl(clientConfig); // 3. Begin to execute the code JobExecuteResult jobExecuteResult = client.execute(JobExecuteAction.builder() .setCreator("LinkisClient-Test") //creator. The name of the system which holds the UJES client, used for system level isolation. .addExecuteCode("show tables") //ExecutionCode. The code which is requested to be executed .setEngineType(JobExecuteAction.EngineType$.MODULE$.HIVE()) // The engine type expected by the client, i.e. Spark, Hive, etc... .setUser("johnnwang") //User, The user who makes this request;Used for user level multi-tenant isolation .build()); System.out.println("execId: " + jobExecuteResult.getExecID() + ", taskId: " + jobExecuteResult.taskID()); // 4. Synch the status of script execution JobStatusResult status = client.status(jobExecuteResult); while(!status.isCompleted()) { // 5. Synch the status of script execution JobProgressResult progress = client.progress(jobExecuteResult); Utils.sleepQuietly(500); status = client.status(jobExecuteResult); } // 6. Synch the job information of script execution JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult); // 7. Fetch the list of result sets(Multiple result sets will be generated if a user submitted multiple SQL at once) String resultSet = jobInfo.getResultSetList(client)[0]; // 8. Fetch detailed result set content with a particular result set info Object fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build()).getFileContent(); System.out.println("fileContents: " + fileContents); IOUtils.closeQuietly(client); } }
import java.util.concurrent.TimeUnit import com.webank.wedatasphere.Linkis.common.utils.Utils import com.webank.wedatasphere.Linkis.httpclient.dws.authentication.StaticAuthenticationStrategy import com.webank.wedatasphere.Linkis.httpclient.dws.config.DWSClientConfigBuilder import com.webank.wedatasphere.Linkis.ujes.client.request.JobExecuteAction.EngineType import com.webank.wedatasphere.Linkis.ujes.client.request.{JobExecuteAction, ResultSetAction} import org.apache.commons.io.IOUtils object UJESClientImplTest extends App { // 1. To do the configuration, an instance of DWSClientConfig should be obtained from DWSClientBuilder. val clientConfig = DWSClientConfigBuilder.newBuilder() .addUJESServerUrl("http://${ip}:${port}") //Specify the ServerUrl,The address of Willink gateway, i.e. http://{ip}:{port} .connectionTimeout(30000) //connectionTimeOut: The connection timeout of the client .discoveryEnabled(true).discoveryFrequency(1, TimeUnit.MINUTES) //Enable service discovery. Once enabled, newly started Gateway will be auto-dicovered. .loadbalancerEnabled(true) // Enable load balancing. Cannot be enabled alone without service discovery enabled. .maxConnectionSize(5) //Max connection size, aka the max concurrent threshold .retryEnabled(false).readTimeout(30000) //whether to retry after failure .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy, The authentication strategy of Linkis .setAuthTokenKey("${username}").setAuthTokenValue("${password}") //The authentication key,usually the username;The authentication value,usually the password .setDWSVersion("v1").build() //The version of Linkis background protocol, currently v1 // 2. Create a UJESClient from DWSClientConfig val client = UJESClient(clientConfig) // 3. Begin to execute the code val jobExecuteResult = client.execute(JobExecuteAction.builder() .setCreator("LinkisClient-Test") //creator. The name of the system which holds the UJES client, used for system level isolation. .addExecuteCode("show tables") //ExecutionCode. The code which is requested to be executed .setEngineType(EngineType.SPARK) // The engine type expected by the client, i.e. Spark, Hive, etc... .setUser("${username}").build()) //User, The user who makes this request;Used for user level multi-tenant isolation println("execId: " + jobExecuteResult.getExecID + ", taskId: " + jobExecuteResult.taskID) // 4. Synch the status of script execution var status = client.status(jobExecuteResult) while(!status.isCompleted) { // 5. Synch the status of script execution val progress = client.progress(jobExecuteResult) val progressInfo = if(progress.getProgressInfo != null) progress.getProgressInfo.toList else List.empty println("progress: " + progress.getProgress + ", progressInfo: " + progressInfo) Utils.sleepQuietly(500) status = client.status(jobExecuteResult) } // 6. Synch the job information of script execution val jobInfo = client.getJobInfo(jobExecuteResult) // 7. Fetch the list of result sets(Multiple result sets will be generated if a user submitted multiple SQL at once) val resultSet = jobInfo.getResultSetList(client).head // 8. Fetch detailed result set content with a particular result set info val fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser).build()).getFileContent println("fileContents: " + fileContents) IOUtils.closeQuietly(client) }