| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.falcon.client; |
| |
| import com.sun.jersey.api.client.Client; |
| import com.sun.jersey.api.client.ClientResponse; |
| import com.sun.jersey.api.client.WebResource; |
| import com.sun.jersey.api.client.config.DefaultClientConfig; |
| import com.sun.jersey.client.urlconnection.HTTPSProperties; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.net.util.TrustManagerUtils; |
| import org.apache.falcon.LifeCycle; |
| import org.apache.falcon.entity.v0.EntityType; |
| import org.apache.falcon.entity.v0.SchemaHelper; |
| import org.apache.falcon.resource.APIResult; |
| import org.apache.falcon.resource.EntityList; |
| import org.apache.falcon.resource.EntitySummaryResult; |
| import org.apache.falcon.resource.InstancesResult; |
| import org.apache.falcon.resource.InstancesSummaryResult; |
| import org.apache.hadoop.security.authentication.client.AuthenticatedURL; |
| import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; |
| import org.apache.hadoop.security.authentication.client.PseudoAuthenticator; |
| |
| import javax.net.ssl.HostnameVerifier; |
| import javax.net.ssl.HttpsURLConnection; |
| import javax.net.ssl.SSLContext; |
| import javax.net.ssl.SSLSession; |
| import javax.net.ssl.TrustManager; |
| import javax.ws.rs.HttpMethod; |
| import javax.ws.rs.core.MediaType; |
| import javax.ws.rs.core.Response; |
| import javax.ws.rs.core.UriBuilder; |
| import java.io.BufferedReader; |
| import java.io.ByteArrayInputStream; |
| import java.io.FileInputStream; |
| import java.io.FileNotFoundException; |
| import java.io.FileReader; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.UnsupportedEncodingException; |
| import java.net.URL; |
| import java.security.SecureRandom; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| |
| /** |
| * Client API to submit and manage Falcon Entities (Cluster, Feed, Process) jobs |
| * against an Falcon instance. |
| */ |
| public class FalconClient { |
| |
| public static final String WS_HEADER_PREFIX = "header:"; |
| public static final String USER = System.getProperty("user.name"); |
| public static final String AUTH_URL = "api/options?" + PseudoAuthenticator.USER_NAME + "=" + USER; |
| |
| private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters"; |
| private static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters"; |
| |
| /** |
| * Name of the HTTP cookie used for the authentication token between the client and the server. |
| */ |
| public static final String AUTH_COOKIE = "hadoop.auth"; |
| private static final String AUTH_COOKIE_EQ = AUTH_COOKIE + "="; |
| private static final KerberosAuthenticator AUTHENTICATOR = new KerberosAuthenticator(); |
| |
| public static final int DEFAULT_NUM_RESULTS = 10; |
| |
| public static final HostnameVerifier ALL_TRUSTING_HOSTNAME_VERIFIER = new HostnameVerifier() { |
| @Override |
| public boolean verify(String hostname, SSLSession sslSession) { |
| return true; |
| } |
| }; |
| |
| private final WebResource service; |
| private final AuthenticatedURL.Token authenticationToken; |
| |
| private final Properties clientProperties; |
| |
| /** |
| * Create a Falcon client instance. |
| * |
| * @param falconUrl of the server to which client interacts |
| * @throws FalconCLIException - If unable to initialize SSL Props |
| */ |
| public FalconClient(String falconUrl) throws FalconCLIException { |
| this(falconUrl, new Properties()); |
| } |
| |
| /** |
| * Create a Falcon client instance. |
| * |
| * @param falconUrl of the server to which client interacts |
| * @param properties client properties |
| * @throws FalconCLIException - If unable to initialize SSL Props |
| */ |
| public FalconClient(String falconUrl, Properties properties) throws FalconCLIException { |
| try { |
| String baseUrl = notEmpty(falconUrl, "FalconUrl"); |
| if (!baseUrl.endsWith("/")) { |
| baseUrl += "/"; |
| } |
| this.clientProperties = properties; |
| SSLContext sslContext = getSslContext(); |
| DefaultClientConfig config = new DefaultClientConfig(); |
| config.getProperties().put(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES, |
| new HTTPSProperties(ALL_TRUSTING_HOSTNAME_VERIFIER, sslContext) |
| ); |
| Client client = Client.create(config); |
| client.setConnectTimeout(Integer.parseInt(clientProperties.getProperty("falcon.connect.timeout", |
| "180000"))); |
| client.setReadTimeout(Integer.parseInt(clientProperties.getProperty("falcon.read.timeout", "180000"))); |
| service = client.resource(UriBuilder.fromUri(baseUrl).build()); |
| client.resource(UriBuilder.fromUri(baseUrl).build()); |
| authenticationToken = getToken(baseUrl); |
| } catch (Exception e) { |
| throw new FalconCLIException("Unable to initialize Falcon Client object", e); |
| } |
| } |
| |
| private static SSLContext getSslContext() throws Exception { |
| SSLContext sslContext = SSLContext.getInstance("SSL"); |
| sslContext.init( |
| null, |
| new TrustManager[]{TrustManagerUtils.getValidateServerCertificateTrustManager()}, |
| new SecureRandom()); |
| return sslContext; |
| } |
| |
| public Properties getClientProperties() { |
| return clientProperties; |
| } |
| |
| public static AuthenticatedURL.Token getToken(String baseUrl) throws FalconCLIException { |
| AuthenticatedURL.Token currentToken = new AuthenticatedURL.Token(); |
| try { |
| URL url = new URL(baseUrl + AUTH_URL); |
| // using KerberosAuthenticator which falls back to PsuedoAuthenticator |
| // instead of passing authentication type from the command line - bad factory |
| HttpsURLConnection.setDefaultSSLSocketFactory(getSslContext().getSocketFactory()); |
| HttpsURLConnection.setDefaultHostnameVerifier(ALL_TRUSTING_HOSTNAME_VERIFIER); |
| new AuthenticatedURL(AUTHENTICATOR).openConnection(url, currentToken); |
| } catch (Exception ex) { |
| throw new FalconCLIException("Could not authenticate, " + ex.getMessage(), ex); |
| } |
| |
| return currentToken; |
| } |
| |
| /** |
| * Methods allowed on Entity Resources. |
| */ |
| protected static enum Entities { |
| VALIDATE("api/entities/validate/", HttpMethod.POST, MediaType.TEXT_XML), |
| SUBMIT("api/entities/submit/", HttpMethod.POST, MediaType.TEXT_XML), |
| UPDATE("api/entities/update/", HttpMethod.POST, MediaType.TEXT_XML), |
| SUBMITandSCHEDULE("api/entities/submitAndSchedule/", HttpMethod.POST, MediaType.TEXT_XML), |
| SCHEDULE("api/entities/schedule/", HttpMethod.POST, MediaType.TEXT_XML), |
| SUSPEND("api/entities/suspend/", HttpMethod.POST, MediaType.TEXT_XML), |
| RESUME("api/entities/resume/", HttpMethod.POST, MediaType.TEXT_XML), |
| DELETE("api/entities/delete/", HttpMethod.DELETE, MediaType.TEXT_XML), |
| STATUS("api/entities/status/", HttpMethod.GET, MediaType.TEXT_XML), |
| DEFINITION("api/entities/definition/", HttpMethod.GET, MediaType.TEXT_XML), |
| LIST("api/entities/list/", HttpMethod.GET, MediaType.TEXT_XML), |
| SUMMARY("api/entities/summary", HttpMethod.GET, MediaType.APPLICATION_JSON), |
| DEPENDENCY("api/entities/dependencies/", HttpMethod.GET, MediaType.TEXT_XML); |
| |
| private String path; |
| private String method; |
| private String mimeType; |
| |
| Entities(String path, String method, String mimeType) { |
| this.path = path; |
| this.method = method; |
| this.mimeType = mimeType; |
| } |
| } |
| |
| /** |
| * Methods allowed on Process Instance Resources. |
| */ |
| protected static enum Instances { |
| RUNNING("api/instance/running/", HttpMethod.GET, MediaType.APPLICATION_JSON), |
| STATUS("api/instance/status/", HttpMethod.GET, MediaType.APPLICATION_JSON), |
| LIST("api/instance/list", HttpMethod.GET, MediaType.APPLICATION_JSON), |
| KILL("api/instance/kill/", HttpMethod.POST, MediaType.APPLICATION_JSON), |
| SUSPEND("api/instance/suspend/", HttpMethod.POST, MediaType.APPLICATION_JSON), |
| RESUME("api/instance/resume/", HttpMethod.POST, MediaType.APPLICATION_JSON), |
| RERUN("api/instance/rerun/", HttpMethod.POST, MediaType.APPLICATION_JSON), |
| LOG("api/instance/logs/", HttpMethod.GET, MediaType.APPLICATION_JSON), |
| SUMMARY("api/instance/summary/", HttpMethod.GET, MediaType.APPLICATION_JSON), |
| PARAMS("api/instance/params/", HttpMethod.GET, MediaType.APPLICATION_JSON); |
| |
| private String path; |
| private String method; |
| private String mimeType; |
| |
| Instances(String path, String method, String mimeType) { |
| this.path = path; |
| this.method = method; |
| this.mimeType = mimeType; |
| } |
| } |
| |
| protected static enum AdminOperations { |
| |
| STACK("api/admin/stack", HttpMethod.GET, MediaType.TEXT_PLAIN), |
| VERSION("api/admin/version", HttpMethod.GET, MediaType.APPLICATION_JSON); |
| |
| private String path; |
| private String method; |
| private String mimeType; |
| |
| AdminOperations(String path, String method, String mimeType) { |
| this.path = path; |
| this.method = method; |
| this.mimeType = mimeType; |
| } |
| } |
| |
| public String notEmpty(String str, String name) { |
| if (str == null) { |
| |
| throw new IllegalArgumentException(name + " cannot be null"); |
| } |
| if (str.length() == 0) { |
| throw new IllegalArgumentException(name + " cannot be empty"); |
| } |
| return str; |
| } |
| |
| public String schedule(String entityType, String entityName, String colo) |
| throws FalconCLIException { |
| |
| return sendEntityRequest(Entities.SCHEDULE, entityType, entityName, |
| colo); |
| |
| } |
| |
| public String suspend(String entityType, String entityName, String colo) |
| throws FalconCLIException { |
| |
| return sendEntityRequest(Entities.SUSPEND, entityType, entityName, colo); |
| |
| } |
| |
| public String resume(String entityType, String entityName, String colo) |
| throws FalconCLIException { |
| |
| return sendEntityRequest(Entities.RESUME, entityType, entityName, colo); |
| |
| } |
| |
| public String delete(String entityType, String entityName) |
| throws FalconCLIException { |
| |
| return sendEntityRequest(Entities.DELETE, entityType, entityName, null); |
| |
| } |
| |
| public String validate(String entityType, String filePath) |
| throws FalconCLIException { |
| |
| InputStream entityStream = getServletInputStream(filePath); |
| return sendEntityRequestWithObject(Entities.VALIDATE, entityType, |
| entityStream, null); |
| } |
| |
| public String submit(String entityType, String filePath) |
| throws FalconCLIException { |
| |
| InputStream entityStream = getServletInputStream(filePath); |
| return sendEntityRequestWithObject(Entities.SUBMIT, entityType, |
| entityStream, null); |
| } |
| |
| public String update(String entityType, String entityName, String filePath, Date effectiveTime) |
| throws FalconCLIException { |
| InputStream entityStream = getServletInputStream(filePath); |
| Entities operation = Entities.UPDATE; |
| WebResource resource = service.path(operation.path).path(entityType).path(entityName); |
| if (effectiveTime != null) { |
| resource = resource.queryParam("effective", SchemaHelper.formatDateUTC(effectiveTime)); |
| } |
| ClientResponse clientResponse = resource |
| .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) |
| .accept(operation.mimeType).type(MediaType.TEXT_XML) |
| .method(operation.method, ClientResponse.class, entityStream); |
| checkIfSuccessful(clientResponse); |
| return parseAPIResult(clientResponse); |
| } |
| |
| public String submitAndSchedule(String entityType, String filePath) |
| throws FalconCLIException { |
| |
| InputStream entityStream = getServletInputStream(filePath); |
| return sendEntityRequestWithObject(Entities.SUBMITandSCHEDULE, |
| entityType, entityStream, null); |
| } |
| |
| public String getStatus(String entityType, String entityName, String colo) |
| throws FalconCLIException { |
| |
| return sendEntityRequest(Entities.STATUS, entityType, entityName, colo); |
| } |
| |
| public String getDefinition(String entityType, String entityName) |
| throws FalconCLIException { |
| |
| return sendDefinitionRequest(Entities.DEFINITION, entityType, |
| entityName); |
| } |
| |
| public EntityList getDependency(String entityType, String entityName) |
| throws FalconCLIException { |
| return sendDependencyRequest(Entities.DEPENDENCY, entityType, entityName); |
| } |
| |
| //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck |
| |
| public EntityList getEntityList(String entityType, String fields, String filterBy, String filterTags, |
| String orderBy, String sortOrder, |
| Integer offset, Integer numResults) throws FalconCLIException { |
| return sendListRequest(Entities.LIST, entityType, fields, filterBy, |
| filterTags, orderBy, sortOrder, offset, numResults); |
| } |
| |
| public String getEntitySummary(String entityType, String cluster, String start, String end, |
| String fields, String filterBy, String filterTags, |
| String orderBy, String sortOrder, |
| Integer offset, Integer numResults, Integer numInstances) |
| throws FalconCLIException { |
| return sendEntitySummaryRequest(Entities.SUMMARY, entityType, cluster, start, end, fields, filterBy, filterTags, |
| orderBy, sortOrder, offset, numResults, numInstances); |
| } |
| |
| public String getRunningInstances(String type, String entity, String colo, List<LifeCycle> lifeCycles, |
| String filterBy, String orderBy, String sortOrder, |
| Integer offset, Integer numResults) throws FalconCLIException { |
| |
| return sendInstanceRequest(Instances.RUNNING, type, entity, null, null, |
| null, null, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults); |
| } |
| |
| public String getStatusOfInstances(String type, String entity, |
| String start, String end, |
| String colo, List<LifeCycle> lifeCycles, String filterBy, |
| String orderBy, String sortOrder, |
| Integer offset, Integer numResults) throws FalconCLIException { |
| |
| return sendInstanceRequest(Instances.STATUS, type, entity, start, end, |
| null, null, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults); |
| } |
| |
| public String getSummaryOfInstances(String type, String entity, |
| String start, String end, |
| String colo, List<LifeCycle> lifeCycles) throws FalconCLIException { |
| |
| return sendInstanceRequest(Instances.SUMMARY, type, entity, start, end, |
| null, null, colo, lifeCycles); |
| } |
| |
| public String killInstances(String type, String entity, String start, |
| String end, String colo, String clusters, |
| String sourceClusters, List<LifeCycle> lifeCycles) |
| throws FalconCLIException, UnsupportedEncodingException { |
| |
| return sendInstanceRequest(Instances.KILL, type, entity, start, end, |
| getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles); |
| } |
| |
| public String suspendInstances(String type, String entity, String start, |
| String end, String colo, String clusters, |
| String sourceClusters, List<LifeCycle> lifeCycles) |
| throws FalconCLIException, UnsupportedEncodingException { |
| |
| return sendInstanceRequest(Instances.SUSPEND, type, entity, start, end, |
| getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles); |
| } |
| |
| public String resumeInstances(String type, String entity, String start, |
| String end, String colo, String clusters, |
| String sourceClusters, List<LifeCycle> lifeCycles) |
| throws FalconCLIException, UnsupportedEncodingException { |
| |
| return sendInstanceRequest(Instances.RESUME, type, entity, start, end, |
| getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles); |
| } |
| |
| public String rerunInstances(String type, String entity, String start, |
| String end, String filePath, String colo, |
| String clusters, String sourceClusters, List<LifeCycle> lifeCycles) |
| throws FalconCLIException, IOException { |
| |
| StringBuilder buffer = new StringBuilder(); |
| if (filePath != null) { |
| BufferedReader in = null; |
| try { |
| in = new BufferedReader(new FileReader(filePath)); |
| |
| String str; |
| while ((str = in.readLine()) != null) { |
| buffer.append(str).append("\n"); |
| } |
| } finally { |
| IOUtils.closeQuietly(in); |
| } |
| } |
| String temp = (buffer.length() == 0) ? null : buffer.toString(); |
| return sendInstanceRequest(Instances.RERUN, type, entity, start, end, |
| getServletInputStream(clusters, sourceClusters, temp), null, colo, lifeCycles); |
| } |
| |
| public String rerunInstances(String type, String entity, String start, |
| String end, String colo, String clusters, String sourceClusters, |
| List<LifeCycle> lifeCycles) |
| throws FalconCLIException, UnsupportedEncodingException { |
| |
| return sendInstanceRequest(Instances.RERUN, type, entity, start, end, |
| getServletInputStream(clusters, sourceClusters, "oozie.wf.rerun.failnodes=true\n"), null, colo, |
| lifeCycles); |
| } |
| |
| public String getLogsOfInstances(String type, String entity, String start, |
| String end, String colo, String runId, |
| List<LifeCycle> lifeCycles, String filterBy, |
| String orderBy, String sortOrder, Integer offset, Integer numResults) |
| throws FalconCLIException { |
| |
| return sendInstanceRequest(Instances.LOG, type, entity, start, end, |
| null, runId, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults); |
| } |
| |
| public String getParamsOfInstance(String type, String entity, |
| String start, String colo, |
| String clusters, String sourceClusters, |
| List<LifeCycle> lifeCycles) |
| throws FalconCLIException, UnsupportedEncodingException { |
| |
| return sendInstanceRequest(Instances.PARAMS, type, entity, |
| start, null, null, null, colo, lifeCycles); |
| } |
| |
| public String getThreadDump() throws FalconCLIException { |
| return sendAdminRequest(AdminOperations.STACK); |
| } |
| |
| public String getVersion() throws FalconCLIException { |
| return sendAdminRequest(AdminOperations.VERSION); |
| } |
| |
| public int getStatus() throws FalconCLIException { |
| AdminOperations job = AdminOperations.VERSION; |
| ClientResponse clientResponse = service.path(job.path) |
| .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) |
| .accept(job.mimeType).type(MediaType.TEXT_PLAIN) |
| .method(job.method, ClientResponse.class); |
| return clientResponse.getStatus(); |
| } |
| |
| /** |
| * Converts a InputStream into ServletInputStream. |
| * |
| * @param filePath - Path of file to stream |
| * @return ServletInputStream |
| * @throws FalconCLIException |
| */ |
| private InputStream getServletInputStream(String filePath) |
| throws FalconCLIException { |
| |
| if (filePath == null) { |
| return null; |
| } |
| InputStream stream; |
| try { |
| stream = new FileInputStream(filePath); |
| } catch (FileNotFoundException e) { |
| throw new FalconCLIException("File not found:", e); |
| } |
| return stream; |
| } |
| |
| private InputStream getServletInputStream(String clusters, |
| String sourceClusters, String properties) |
| throws FalconCLIException, UnsupportedEncodingException { |
| |
| InputStream stream; |
| StringBuilder buffer = new StringBuilder(); |
| if (clusters != null) { |
| buffer.append(FALCON_INSTANCE_ACTION_CLUSTERS).append('=').append(clusters).append('\n'); |
| } |
| if (sourceClusters != null) { |
| buffer.append(FALCON_INSTANCE_SOURCE_CLUSTERS).append('=').append(sourceClusters).append('\n'); |
| } |
| if (properties != null) { |
| buffer.append(properties); |
| } |
| stream = new ByteArrayInputStream(buffer.toString().getBytes()); |
| return (buffer.length() == 0) ? null : stream; |
| } |
| |
| private String sendEntityRequest(Entities entities, String entityType, |
| String entityName, String colo) throws FalconCLIException { |
| |
| WebResource resource = service.path(entities.path) |
| .path(entityType).path(entityName); |
| if (colo != null) { |
| resource = resource.queryParam("colo", colo); |
| } |
| ClientResponse clientResponse = resource |
| .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) |
| .accept(entities.mimeType).type(MediaType.TEXT_XML) |
| .method(entities.method, ClientResponse.class); |
| |
| checkIfSuccessful(clientResponse); |
| |
| return parseAPIResult(clientResponse); |
| } |
| |
| private WebResource addParamsToResource(WebResource resource, |
| String start, String end, String runId, String colo, |
| String fields, String filterBy, String tags, |
| String orderBy, String sortOrder, |
| Integer offset, Integer numResults, Integer numInstances) { |
| |
| if (!StringUtils.isEmpty(fields)) { |
| resource = resource.queryParam("fields", fields); |
| } |
| if (!StringUtils.isEmpty(tags)) { |
| resource = resource.queryParam("tags", tags); |
| } |
| if (!StringUtils.isEmpty(filterBy)) { |
| resource = resource.queryParam("filterBy", filterBy); |
| } |
| if (!StringUtils.isEmpty(orderBy)) { |
| resource = resource.queryParam("orderBy", orderBy); |
| } |
| if (!StringUtils.isEmpty(sortOrder)) { |
| resource = resource.queryParam("sortOrder", sortOrder); |
| } |
| if (!StringUtils.isEmpty(start)) { |
| resource = resource.queryParam("start", start); |
| } |
| if (!StringUtils.isEmpty(end)) { |
| resource = resource.queryParam("end", end); |
| } |
| if (runId != null) { |
| resource = resource.queryParam("runid", runId); |
| } |
| if (colo != null) { |
| resource = resource.queryParam("colo", colo); |
| } |
| if (offset != null) { |
| resource = resource.queryParam("offset", offset.toString()); |
| } |
| if (numResults != null) { |
| resource = resource.queryParam("numResults", numResults.toString()); |
| } |
| if (numInstances != null) { |
| resource = resource.queryParam("numInstances", numInstances.toString()); |
| } |
| return resource; |
| |
| } |
| |
| private String sendEntitySummaryRequest(Entities entities, String entityType, String cluster, |
| String start, String end, |
| String fields, String filterBy, String filterTags, |
| String orderBy, String sortOrder, Integer offset, Integer numResults, |
| Integer numInstances) throws FalconCLIException { |
| WebResource resource; |
| if (StringUtils.isEmpty(cluster)) { |
| resource = service.path(entities.path).path(entityType); |
| } else { |
| resource = service.path(entities.path).path(entityType).path(cluster); |
| } |
| |
| resource = addParamsToResource(resource, start, end, null, null, |
| fields, filterBy, filterTags, |
| orderBy, sortOrder, |
| offset, numResults, numInstances); |
| |
| ClientResponse clientResponse = resource |
| .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) |
| .accept(entities.mimeType).type(MediaType.TEXT_XML) |
| .method(entities.method, ClientResponse.class); |
| |
| checkIfSuccessful(clientResponse); |
| return parseProcessEntitySummaryResult(clientResponse); |
| } |
| //RESUME CHECKSTYLE CHECK ParameterNumberCheck |
| |
| private String sendDefinitionRequest(Entities entities, String entityType, |
| String entityName) throws FalconCLIException { |
| |
| ClientResponse clientResponse = service |
| .path(entities.path).path(entityType).path(entityName) |
| .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) |
| .accept(entities.mimeType).type(MediaType.TEXT_XML) |
| .method(entities.method, ClientResponse.class); |
| |
| checkIfSuccessful(clientResponse); |
| return clientResponse.getEntity(String.class); |
| } |
| |
| private EntityList sendDependencyRequest(Entities entities, String entityType, |
| String entityName) throws FalconCLIException { |
| |
| ClientResponse clientResponse = service |
| .path(entities.path).path(entityType).path(entityName) |
| .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) |
| .accept(entities.mimeType).type(MediaType.TEXT_XML) |
| .method(entities.method, ClientResponse.class); |
| |
| checkIfSuccessful(clientResponse); |
| |
| return parseEntityList(clientResponse); |
| } |
| |
| private String sendEntityRequestWithObject(Entities entities, String entityType, |
| Object requestObject, String colo) throws FalconCLIException { |
| WebResource resource = service.path(entities.path) |
| .path(entityType); |
| if (colo != null) { |
| resource = resource.queryParam("colo", colo); |
| } |
| ClientResponse clientResponse = resource |
| .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) |
| .accept(entities.mimeType).type(MediaType.TEXT_XML) |
| .method(entities.method, ClientResponse.class, requestObject); |
| |
| checkIfSuccessful(clientResponse); |
| |
| return parseAPIResult(clientResponse); |
| } |
| |
| //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck |
| private String sendInstanceRequest(Instances instances, String type, |
| String entity, String start, String end, InputStream props, |
| String runid, String colo, |
| List<LifeCycle> lifeCycles) throws FalconCLIException { |
| return sendInstanceRequest(instances, type, entity, start, end, props, |
| runid, colo, lifeCycles, "", "", "", 0, DEFAULT_NUM_RESULTS); |
| } |
| |
| private String sendInstanceRequest(Instances instances, String type, String entity, |
| String start, String end, InputStream props, String runid, String colo, |
| List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, |
| Integer offset, Integer numResults) throws FalconCLIException { |
| checkType(type); |
| WebResource resource = service.path(instances.path).path(type) |
| .path(entity); |
| |
| resource = addParamsToResource(resource, start, end, runid, colo, |
| null, filterBy, null, orderBy, sortOrder, offset, numResults, null); |
| |
| if (lifeCycles != null) { |
| checkLifeCycleOption(lifeCycles, type); |
| for (LifeCycle lifeCycle : lifeCycles) { |
| resource = resource.queryParam("lifecycle", lifeCycle.toString()); |
| } |
| } |
| |
| ClientResponse clientResponse; |
| if (props == null) { |
| clientResponse = resource |
| .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) |
| .accept(instances.mimeType) |
| .method(instances.method, ClientResponse.class); |
| } else { |
| clientResponse = resource |
| .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) |
| .accept(instances.mimeType) |
| .method(instances.method, ClientResponse.class, props); |
| } |
| checkIfSuccessful(clientResponse); |
| |
| if (instances.name().equals("LOG")) { |
| return parseProcessInstanceResultLogs(clientResponse, runid); |
| } else if (instances.name().equals("SUMMARY")) { |
| return summarizeProcessInstanceResult(clientResponse); |
| } else { |
| return parseProcessInstanceResult(clientResponse); |
| } |
| |
| } |
| |
| //RESUME CHECKSTYLE CHECK VisibilityModifierCheck |
| |
| private void checkLifeCycleOption(List<LifeCycle> lifeCycles, String type) throws FalconCLIException { |
| if (lifeCycles != null && !lifeCycles.isEmpty()) { |
| EntityType entityType = EntityType.valueOf(type.toUpperCase().trim()); |
| for (LifeCycle lifeCycle : lifeCycles) { |
| if (entityType != lifeCycle.getTag().getType()) { |
| throw new FalconCLIException("Incorrect lifecycle: " + lifeCycle + "for given type: " + type); |
| } |
| } |
| } |
| } |
| |
| protected void checkType(String type) throws FalconCLIException { |
| if (type == null || type.isEmpty()) { |
| throw new FalconCLIException("entity type is empty"); |
| } else { |
| EntityType entityType = EntityType.valueOf(type.toUpperCase().trim()); |
| if (entityType == EntityType.CLUSTER) { |
| throw new FalconCLIException( |
| "Instance management functions don't apply to Cluster entities"); |
| } |
| } |
| } |
| |
| //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck |
| private EntityList sendListRequest(Entities entities, String entityType, String fields, String filterBy, |
| String filterTags, String orderBy, String sortOrder, Integer offset, |
| Integer numResults) throws FalconCLIException { |
| WebResource resource = service.path(entities.path) |
| .path(entityType); |
| resource = addParamsToResource(resource, null, null, null, null, fields, filterBy, filterTags, |
| orderBy, sortOrder, offset, numResults, null); |
| |
| ClientResponse clientResponse = resource |
| .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) |
| .accept(entities.mimeType).type(MediaType.TEXT_XML) |
| .method(entities.method, ClientResponse.class); |
| |
| checkIfSuccessful(clientResponse); |
| |
| return parseEntityList(clientResponse); |
| } |
| // RESUME CHECKSTYLE CHECK ParameterNumberCheck |
| |
| private String sendAdminRequest(AdminOperations job) throws FalconCLIException { |
| ClientResponse clientResponse = service.path(job.path) |
| .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) |
| .accept(job.mimeType) |
| .type(job.mimeType) |
| .method(job.method, ClientResponse.class); |
| return parseStringResult(clientResponse); |
| } |
| |
| private String parseAPIResult(ClientResponse clientResponse) |
| throws FalconCLIException { |
| |
| APIResult result = clientResponse.getEntity(APIResult.class); |
| return result.getMessage(); |
| } |
| |
| private EntityList parseEntityList(ClientResponse clientResponse) |
| throws FalconCLIException { |
| |
| EntityList result = clientResponse.getEntity(EntityList.class); |
| if (result == null || result.getElements() == null) { |
| return null; |
| } |
| return result; |
| |
| } |
| |
| private String parseStringResult(ClientResponse clientResponse) |
| throws FalconCLIException { |
| |
| return clientResponse.getEntity(String.class); |
| } |
| |
| private String parseProcessEntitySummaryResult(ClientResponse clientResponse) { |
| EntitySummaryResult result = clientResponse.getEntity(EntitySummaryResult.class); |
| StringBuilder sb = new StringBuilder(); |
| String toAppend; |
| sb.append("Consolidated Status: ").append(result.getStatus()).append("\n"); |
| sb.append("\nEntity Summary Result :\n"); |
| if (result.getEntitySummaries() != null) { |
| for (EntitySummaryResult.EntitySummary entitySummary : result.getEntitySummaries()) { |
| |
| toAppend = entitySummary.toString(); |
| sb.append(toAppend).append("\n"); |
| } |
| } |
| sb.append("\nAdditional Information:\n"); |
| sb.append("Response: ").append(result.getMessage()); |
| sb.append("Request Id: ").append(result.getRequestId()); |
| return sb.toString(); |
| } |
| |
| private String summarizeProcessInstanceResult(ClientResponse clientResponse) { |
| InstancesSummaryResult result = clientResponse |
| .getEntity(InstancesSummaryResult.class); |
| StringBuilder sb = new StringBuilder(); |
| String toAppend; |
| |
| sb.append("Consolidated Status: ").append(result.getStatus()).append("\n"); |
| sb.append("\nInstances Summary:\n"); |
| |
| if (result.getInstancesSummary() != null) { |
| for (InstancesSummaryResult.InstanceSummary summary : result.getInstancesSummary()) { |
| toAppend = summary.getCluster() != null ? summary.getCluster() : "-"; |
| sb.append("Cluster: ").append(toAppend).append("\n"); |
| |
| sb.append("Status\t\tCount\n"); |
| sb.append("-------------------------\n"); |
| |
| for (Map.Entry<String, Long> entry : summary.getSummaryMap().entrySet()) { |
| sb.append(entry.getKey()).append("\t\t").append(entry.getValue()).append("\n"); |
| } |
| } |
| } |
| |
| sb.append("\nAdditional Information:\n"); |
| sb.append("Response: ").append(result.getMessage()); |
| sb.append("Request Id: ").append(result.getRequestId()); |
| return sb.toString(); |
| } |
| |
| private String parseProcessInstanceResult(ClientResponse clientResponse) { |
| InstancesResult result = clientResponse |
| .getEntity(InstancesResult.class); |
| StringBuilder sb = new StringBuilder(); |
| String toAppend; |
| |
| sb.append("Consolidated Status: ").append(result.getStatus()).append("\n"); |
| |
| sb.append("\nInstances:\n"); |
| sb.append("Instance\t\tCluster\t\tSourceCluster\t\tStatus\t\tStart\t\tEnd\t\tDetails\t\t\t\t\tLog\n"); |
| sb.append("-----------------------------------------------------------------------------------------------\n"); |
| if (result.getInstances() != null) { |
| for (InstancesResult.Instance instance : result.getInstances()) { |
| |
| toAppend = instance.getInstance() != null ? instance.getInstance() : "-"; |
| sb.append(toAppend).append("\t"); |
| |
| toAppend = instance.getCluster() != null ? instance.getCluster() : "-"; |
| sb.append(toAppend).append("\t"); |
| |
| toAppend = instance.getSourceCluster() != null ? instance.getSourceCluster() : "-"; |
| sb.append(toAppend).append("\t"); |
| |
| toAppend = (instance.getStatus() != null ? instance.getStatus().toString() : "-"); |
| sb.append(toAppend).append("\t"); |
| |
| toAppend = instance.getStartTime() != null |
| ? SchemaHelper.formatDateUTC(instance.getStartTime()) : "-"; |
| sb.append(toAppend).append("\t"); |
| |
| toAppend = instance.getEndTime() != null |
| ? SchemaHelper.formatDateUTC(instance.getEndTime()) : "-"; |
| sb.append(toAppend).append("\t"); |
| |
| toAppend = (!StringUtils.isEmpty(instance.getDetails())) |
| ? instance.getDetails() : "-"; |
| sb.append(toAppend).append("\t"); |
| |
| toAppend = instance.getLogFile() != null ? instance.getLogFile() : "-"; |
| sb.append(toAppend).append("\n"); |
| |
| if (instance.getWfParams() != null) { |
| Map<String, String> props = instance.getWfParams(); |
| sb.append("Workflow params").append("\n"); |
| for (Map.Entry<String, String> entry : props.entrySet()) { |
| sb.append(entry.getKey()).append("=").append(entry.getValue()).append("\n"); |
| } |
| sb.append("\n"); |
| } |
| |
| } |
| } |
| sb.append("\nAdditional Information:\n"); |
| sb.append("Response: ").append(result.getMessage()); |
| sb.append("Request Id: ").append(result.getRequestId()); |
| return sb.toString(); |
| } |
| |
| private String parseProcessInstanceResultLogs(ClientResponse clientResponse, String runid) { |
| InstancesResult result = clientResponse |
| .getEntity(InstancesResult.class); |
| StringBuilder sb = new StringBuilder(); |
| String toAppend; |
| |
| sb.append("Consolidated Status: ").append(result.getStatus()).append("\n"); |
| |
| sb.append("\nInstances:\n"); |
| sb.append("Instance\t\tCluster\t\tSourceCluster\t\tStatus\t\tRunID\t\t\tLog\n"); |
| sb.append("-----------------------------------------------------------------------------------------------\n"); |
| if (result.getInstances() != null) { |
| for (InstancesResult.Instance instance : result.getInstances()) { |
| |
| toAppend = (instance.getInstance() != null) ? instance.getInstance() : "-"; |
| sb.append(toAppend).append("\t"); |
| |
| toAppend = instance.getCluster() != null ? instance.getCluster() : "-"; |
| sb.append(toAppend).append("\t"); |
| |
| toAppend = instance.getSourceCluster() != null ? instance.getSourceCluster() : "-"; |
| sb.append(toAppend).append("\t"); |
| |
| toAppend = (instance.getStatus() != null ? instance.getStatus().toString() : "-"); |
| sb.append(toAppend).append("\t"); |
| |
| toAppend = (runid != null ? runid : "latest"); |
| sb.append(toAppend).append("\t"); |
| |
| toAppend = instance.getLogFile() != null ? instance.getLogFile() : "-"; |
| sb.append(toAppend).append("\n"); |
| |
| if (instance.actions != null) { |
| sb.append("actions:\n"); |
| for (InstancesResult.InstanceAction action : instance.actions) { |
| sb.append(" ").append(action.getAction()).append("\t"); |
| sb.append(action.getStatus()).append("\t").append(action.getLogFile()).append("\n"); |
| } |
| } |
| } |
| } |
| sb.append("\nAdditional Information:\n"); |
| sb.append("Response: ").append(result.getMessage()); |
| sb.append("Request Id: ").append(result.getRequestId()); |
| return sb.toString(); |
| } |
| |
| protected static enum GraphOperations { |
| |
| VERTICES("api/graphs/lineage/vertices", HttpMethod.GET, MediaType.APPLICATION_JSON), |
| EDGES("api/graphs/lineage/edges", HttpMethod.GET, MediaType.APPLICATION_JSON); |
| |
| private String path; |
| private String method; |
| private String mimeType; |
| |
| GraphOperations(String path, String method, String mimeType) { |
| this.path = path; |
| this.method = method; |
| this.mimeType = mimeType; |
| } |
| } |
| |
| public String getVertex(String id) throws FalconCLIException { |
| return sendGraphRequest(GraphOperations.VERTICES, id); |
| } |
| |
| public String getVertices(String key, String value) throws FalconCLIException { |
| return sendGraphRequest(GraphOperations.VERTICES, key, value); |
| } |
| |
| public String getVertexEdges(String id, String direction) throws FalconCLIException { |
| return sendGraphRequestForEdges(GraphOperations.VERTICES, id, direction); |
| } |
| |
| public String getEdge(String id) throws FalconCLIException { |
| return sendGraphRequest(GraphOperations.EDGES, id); |
| } |
| |
| private String sendGraphRequest(GraphOperations job, String id) throws FalconCLIException { |
| ClientResponse clientResponse = service.path(job.path) |
| .path(id) |
| .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) |
| .accept(job.mimeType) |
| .type(job.mimeType) |
| .method(job.method, ClientResponse.class); |
| return parseStringResult(clientResponse); |
| } |
| |
| private String sendGraphRequest(GraphOperations job, String key, |
| String value) throws FalconCLIException { |
| ClientResponse clientResponse = service.path(job.path) |
| .queryParam("key", key) |
| .queryParam("value", value) |
| .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) |
| .accept(job.mimeType) |
| .type(job.mimeType) |
| .method(job.method, ClientResponse.class); |
| return parseStringResult(clientResponse); |
| } |
| |
| private String sendGraphRequestForEdges(GraphOperations job, String id, |
| String direction) throws FalconCLIException { |
| ClientResponse clientResponse = service.path(job.path) |
| .path(id) |
| .path(direction) |
| .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) |
| .accept(job.mimeType) |
| .type(job.mimeType) |
| .method(job.method, ClientResponse.class); |
| return parseStringResult(clientResponse); |
| } |
| |
| private void checkIfSuccessful(ClientResponse clientResponse) throws FalconCLIException { |
| Response.Status.Family statusFamily = clientResponse.getClientResponseStatus().getFamily(); |
| if (statusFamily != Response.Status.Family.SUCCESSFUL |
| && statusFamily != Response.Status.Family.INFORMATIONAL) { |
| throw FalconCLIException.fromReponse(clientResponse); |
| } |
| } |
| } |