blob: 8fdf65a0b01280a27c3b4bad41ee154b65f1902f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.regression.core.util;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.jcraft.jsch.JSchException;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.regression.Entities.ClusterMerlin;
import org.apache.falcon.regression.Entities.FeedMerlin;
import org.apache.falcon.regression.Entities.ProcessMerlin;
import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
import org.apache.falcon.entity.v0.cluster.Interface;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Property;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.interfaces.IEntityManagerHelper;
import org.apache.falcon.regression.core.response.APIResult;
import org.apache.falcon.regression.core.response.ServiceResponse;
import org.apache.falcon.regression.core.supportClasses.JmsMessageConsumer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.http.HttpResponse;
import org.apache.falcon.request.BaseRequest;
import org.apache.falcon.request.RequestKeys;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.testng.Assert;
import org.apache.log4j.Logger;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import javax.xml.transform.OutputKeys;
import javax.xml.transform.Source;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerConfigurationException;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.stream.StreamSource;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.UUID;
/**
* util methods used across test.
*/
public final class Util {
private Util() {
throw new AssertionError("Instantiating utility class...");
}
private static final Logger LOGGER = Logger.getLogger(Util.class);
public static ServiceResponse sendRequest(String url, String method)
throws IOException, URISyntaxException, AuthenticationException {
return sendRequest(url, method, null, null);
}
public static ServiceResponse sendRequest(String url, String method, String user)
throws IOException, URISyntaxException, AuthenticationException {
return sendRequest(url, method, null, user);
}
public static ServiceResponse sendRequest(String url, String method, String data,
String user)
throws IOException, URISyntaxException, AuthenticationException {
BaseRequest request = new BaseRequest(url, method, user, data);
request.addHeader(RequestKeys.CONTENT_TYPE_HEADER, RequestKeys.XML_CONTENT_TYPE);
HttpResponse response = request.run();
return new ServiceResponse(response);
}
public static String getProcessName(String data) {
ProcessMerlin processElement = new ProcessMerlin(data);
return processElement.getName();
}
private static boolean isXML(String data) {
return data != null && data.trim().length() > 0 && data.trim().startsWith("<");
}
public static APIResult parseResponse(ServiceResponse response) throws JAXBException {
if (!isXML(response.getMessage())) {
return new APIResult(APIResult.Status.FAILED, response.getMessage(), "somerandomstring",
response.getCode());
}
JAXBContext jc = JAXBContext.newInstance(APIResult.class);
Unmarshaller u = jc.createUnmarshaller();
APIResult temp;
if (response.getMessage().contains("requestId")) {
temp = (APIResult) u
.unmarshal(new InputSource(new StringReader(response.getMessage())));
temp.setStatusCode(response.getCode());
} else {
temp = new APIResult();
temp.setStatusCode(response.getCode());
temp.setMessage(response.getMessage());
temp.setRequestId("");
if (response.getCode() == 200) {
temp.setStatus(APIResult.Status.SUCCEEDED);
} else {
temp.setStatus(APIResult.Status.FAILED);
}
}
return temp;
}
public static List<String> getStoreInfo(IEntityManagerHelper helper, String subPath)
throws IOException, JSchException {
if (helper.getStoreLocation().startsWith("hdfs:")) {
return HadoopUtil.getAllFilesHDFS(helper.getHadoopFS(),
new Path(helper.getStoreLocation() + subPath));
} else {
return ExecUtil.runRemoteScriptAsSudo(helper.getQaHost(), helper.getUsername(),
helper.getPassword(), "ls " + helper.getStoreLocation() + "/store" + subPath,
helper.getUsername(), helper.getIdentityFile());
}
}
public static String readEntityName(String data) {
if (data.contains("uri:falcon:feed")) {
return new FeedMerlin(data).getName();
} else if (data.contains("uri:falcon:process")) {
return new ProcessMerlin(data).getName();
} else {
return new ClusterMerlin(data).getName();
}
}
public static String getUniqueString() {
return "-" + UUID.randomUUID().toString().split("-")[0];
}
public static List<String> getHadoopDataFromDir(FileSystem fs, String feed, String dir)
throws IOException {
List<String> finalResult = new ArrayList<String>();
String feedPath = getFeedPath(feed);
int depth = feedPath.split(dir)[1].split("/").length - 1;
List<Path> results = HadoopUtil.getAllDirsRecursivelyHDFS(fs,
new Path(dir), depth);
for (Path result : results) {
int pathDepth = result.toString().split(dir)[1].split("/").length - 1;
if (pathDepth == depth) {
finalResult.add(result.toString().split(dir)[1]);
}
}
return finalResult;
}
public static String setFeedProperty(String feed, String propertyName, String propertyValue) {
FeedMerlin feedObject = new FeedMerlin(feed);
boolean found = false;
for (Property prop : feedObject.getProperties().getProperties()) {
//check if it is present
if (prop.getName().equalsIgnoreCase(propertyName)) {
prop.setValue(propertyValue);
found = true;
break;
}
}
if (!found) {
Property property = new Property();
property.setName(propertyName);
property.setValue(propertyValue);
feedObject.getProperties().getProperties().add(property);
}
return feedObject.toString();
}
public static String getFeedPath(String feed) {
FeedMerlin feedObject = new FeedMerlin(feed);
for (Location location : feedObject.getLocations().getLocations()) {
if (location.getType() == LocationType.DATA) {
return location.getPath();
}
}
return null;
}
public static String insertLateFeedValue(String feed, Frequency frequency) {
FeedMerlin feedObject = new FeedMerlin(feed);
feedObject.getLateArrival().setCutOff(frequency);
return feedObject.toString();
}
public static String setFeedPathValue(String feed, String pathValue) {
FeedMerlin feedObject = new FeedMerlin(feed);
for (Location location : feedObject.getLocations().getLocations()) {
if (location.getType() == LocationType.DATA) {
location.setPath(pathValue);
}
}
return feedObject.toString();
}
public static String findFolderBetweenGivenTimeStamps(DateTime startTime, DateTime endTime,
List<String> folderList) {
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy/MM/dd/HH/mm");
for (String folder : folderList) {
if (folder.compareTo(formatter.print(startTime)) >= 0
&&
folder.compareTo(formatter.print(endTime)) <= 0) {
return folder;
}
}
return null;
}
public static String setFeedName(String feedString, String newName) {
FeedMerlin feedObject = new FeedMerlin(feedString);
feedObject.setName(newName);
return feedObject.toString().trim();
}
public static String setClusterNameInFeed(String feedString, String clusterName,
int clusterIndex) {
FeedMerlin feedObject = new FeedMerlin(feedString);
feedObject.getClusters().getClusters().get(clusterIndex).setName(clusterName);
return feedObject.toString().trim();
}
public static ClusterMerlin getClusterObject(String clusterXML) {
return new ClusterMerlin(clusterXML);
}
public static List<String> getInstanceFinishTimes(ColoHelper coloHelper, String workflowId)
throws IOException, JSchException {
List<String> raw = ExecUtil.runRemoteScriptAsSudo(coloHelper.getProcessHelper()
.getQaHost(), coloHelper.getProcessHelper().getUsername(),
coloHelper.getProcessHelper().getPassword(),
"cat /var/log/ivory/application.* | grep \"" + workflowId + "\" | grep "
+ "\"Received\" | awk '{print $2}'",
coloHelper.getProcessHelper().getUsername(),
coloHelper.getProcessHelper().getIdentityFile()
);
List<String> finalList = new ArrayList<String>();
for (String line : raw) {
finalList.add(line.split(",")[0]);
}
return finalList;
}
public static List<String> getInstanceRetryTimes(ColoHelper coloHelper, String workflowId)
throws IOException, JSchException {
List<String> raw = ExecUtil.runRemoteScriptAsSudo(coloHelper.getProcessHelper()
.getQaHost(), coloHelper.getProcessHelper().getUsername(),
coloHelper.getProcessHelper().getPassword(),
"cat /var/log/ivory/application.* | grep \"" + workflowId + "\" | grep "
+
"\"Retrying attempt\" | awk '{print $2}'",
coloHelper.getProcessHelper().getUsername(),
coloHelper.getProcessHelper().getIdentityFile()
);
List<String> finalList = new ArrayList<String>();
for (String line : raw) {
finalList.add(line.split(",")[0]);
}
return finalList;
}
public static void shutDownService(IEntityManagerHelper helper)
throws IOException, JSchException {
ExecUtil.runRemoteScriptAsSudo(helper.getQaHost(), helper.getUsername(),
helper.getPassword(), helper.getServiceStopCmd(),
helper.getServiceUser(), helper.getIdentityFile());
TimeUtil.sleepSeconds(10);
}
public static void startService(IEntityManagerHelper helper)
throws IOException, JSchException, AuthenticationException, URISyntaxException {
ExecUtil.runRemoteScriptAsSudo(helper.getQaHost(), helper.getUsername(),
helper.getPassword(), helper.getServiceStartCmd(), helper.getServiceUser(),
helper.getIdentityFile());
int statusCode = 0;
for (int tries = 20; tries > 0; tries--) {
try {
statusCode = Util.sendRequest(helper.getHostname(), "get").getCode();
} catch (IOException e) {
LOGGER.info(e.getMessage());
}
if (statusCode == 200) {
return;
}
TimeUtil.sleepSeconds(5);
}
throw new RuntimeException("Service on" + helper.getHostname() + " did not start!");
}
public static void restartService(IEntityManagerHelper helper)
throws IOException, JSchException, AuthenticationException, URISyntaxException {
LOGGER.info("restarting service for: " + helper.getQaHost());
shutDownService(helper);
startService(helper);
}
public static Process getProcessObject(String processData) {
return new ProcessMerlin(processData);
}
public static void printMessageData(JmsMessageConsumer messageConsumer) throws JMSException {
LOGGER.info("dumping all queue data:");
for (MapMessage mapMessage : messageConsumer.getReceivedMessages()) {
StringBuilder stringBuilder = new StringBuilder();
final Enumeration mapNames = mapMessage.getMapNames();
while (mapNames.hasMoreElements()) {
final String propName = mapNames.nextElement().toString();
final String propValue = mapMessage.getString(propName);
stringBuilder.append(propName).append('=').append(propValue).append(' ');
}
LOGGER.info(stringBuilder);
}
}
public static String getEnvClusterXML(String cluster, String prefix) {
ClusterMerlin clusterObject = getClusterObject(cluster);
if ((null == prefix) || prefix.isEmpty()) {
prefix = "";
} else {
prefix = prefix + ".";
}
String hcatEndpoint = Config.getProperty(prefix + "hcat_endpoint");
//now read and set relevant values
for (Interface iface : clusterObject.getInterfaces().getInterfaces()) {
if (iface.getType() == Interfacetype.READONLY) {
iface.setEndpoint(Config.getProperty(prefix + "cluster_readonly"));
} else if (iface.getType() == Interfacetype.WRITE) {
iface.setEndpoint(Config.getProperty(prefix + "cluster_write"));
} else if (iface.getType() == Interfacetype.EXECUTE) {
iface.setEndpoint(Config.getProperty(prefix + "cluster_execute"));
} else if (iface.getType() == Interfacetype.WORKFLOW) {
iface.setEndpoint(Config.getProperty(prefix + "oozie_url"));
} else if (iface.getType() == Interfacetype.MESSAGING) {
iface.setEndpoint(Config.getProperty(prefix + "activemq_url"));
} else if (iface.getType() == Interfacetype.REGISTRY) {
iface.setEndpoint(hcatEndpoint);
}
}
//set colo name:
clusterObject.setColo(Config.getProperty(prefix + "colo"));
// properties in the cluster needed when secure mode is on
if (MerlinConstants.IS_SECURE) {
// get the properties object for the cluster
org.apache.falcon.entity.v0.cluster.Properties clusterProperties =
clusterObject.getProperties();
// add the namenode principal to the properties object
clusterProperties.getProperties().add(getFalconClusterPropertyObject(
"dfs.namenode.kerberos.principal",
Config.getProperty(prefix + "namenode.kerberos.principal", "none")));
// add the hive meta store principal to the properties object
clusterProperties.getProperties().add(getFalconClusterPropertyObject(
"hive.metastore.kerberos.principal",
Config.getProperty(prefix + "hive.metastore.kerberos.principal", "none")));
// Until oozie has better integration with secure hive we need to send the properites to
// falcon.
// hive.metastore.sasl.enabled = true
clusterProperties.getProperties()
.add(getFalconClusterPropertyObject("hive.metastore.sasl.enabled", "true"));
// Only set the metastore uri if its not empty or null.
if (null != hcatEndpoint && !hcatEndpoint.isEmpty()) {
//hive.metastore.uris
clusterProperties.getProperties()
.add(getFalconClusterPropertyObject("hive.metastore.uris", hcatEndpoint));
}
}
return clusterObject.toString();
}
public static org.apache.falcon.entity.v0.cluster.Property
getFalconClusterPropertyObject(String name, String value) {
org.apache.falcon.entity.v0.cluster.Property property = new org
.apache.falcon.entity.v0.cluster.Property();
property.setName(name);
property.setValue(value);
return property;
}
public static EntityType getEntityType(String entity) {
if (entity.contains("uri:falcon:process:0.1")) {
return EntityType.PROCESS;
} else if (entity.contains("uri:falcon:cluster:0.1")) {
return EntityType.CLUSTER;
} else if (entity.contains("uri:falcon:feed:0.1")) {
return EntityType.FEED;
}
return null;
}
/**
* Compares two definitions
* @param server1 server where 1st definition is stored
* @param server2 server where 2nd definition is stored
* @param entity entity which is under analysis
* @return are definitions identical
*/
public static boolean isDefinitionSame(ColoHelper server1, ColoHelper server2,
String entity)
throws URISyntaxException, IOException, AuthenticationException, JAXBException,
SAXException {
return XmlUtil.isIdentical(getEntityDefinition(server1, entity, true),
getEntityDefinition(server2, entity, true));
}
/**
* enums used for instance api.
*/
public enum URLS {
LIST_URL("/api/entities/list"),
SUBMIT_URL("/api/entities/submit"),
GET_ENTITY_DEFINITION("/api/entities/definition"),
DELETE_URL("/api/entities/delete"),
SCHEDULE_URL("/api/entities/schedule"),
VALIDATE_URL("/api/entities/validate"),
SUSPEND_URL("/api/entities/suspend"),
RESUME_URL("/api/entities/resume"),
UPDATE("/api/entities/update"),
STATUS_URL("/api/entities/status"),
SUBMIT_AND_SCHEDULE_URL("/api/entities/submitAndSchedule"),
INSTANCE_RUNNING("/api/instance/running"),
INSTANCE_STATUS("/api/instance/status"),
INSTANCE_KILL("/api/instance/kill"),
INSTANCE_RESUME("/api/instance/resume"),
INSTANCE_SUSPEND("/api/instance/suspend"),
INSTANCE_RERUN("/api/instance/rerun"),
INSTANCE_SUMMARY("/api/instance/summary"),
INSTANCE_PARAMS("/api/instance/params");
private final String url;
URLS(String url) {
this.url = url;
}
public String getValue() {
return this.url;
}
}
/**
* @param pathString whole path
* @return path to basic data folder
*/
public static String getPathPrefix(String pathString) {
return pathString.substring(0, pathString.indexOf('$'));
}
/**
* @param path whole path
* @return file name which is retrieved from a path
*/
public static String getFileNameFromPath(String path) {
return path.substring(path.lastIndexOf('/') + 1, path.length());
}
/**
* Defines request type according to request url
* @param url request url
* @return request type
*/
public static String getMethodType(String url) {
List<String> postList = new ArrayList<String>();
postList.add("/entities/validate");
postList.add("/entities/submit");
postList.add("/entities/submitAndSchedule");
postList.add("/entities/suspend");
postList.add("/entities/resume");
postList.add("/instance/kill");
postList.add("/instance/suspend");
postList.add("/instance/resume");
postList.add("/instance/rerun");
for (String item : postList) {
if (url.toLowerCase().contains(item)) {
return "post";
}
}
List<String> deleteList = new ArrayList<String>();
deleteList.add("/entities/delete");
for (String item : deleteList) {
if (url.toLowerCase().contains(item)) {
return "delete";
}
}
return "get";
}
/**
* Prints xml in readable form
* @param xmlString xmlString
* @return formatted xmlString
*/
public static String prettyPrintXml(final String xmlString) {
if (xmlString == null) {
return null;
}
try {
Source xmlInput = new StreamSource(new StringReader(xmlString));
StringWriter stringWriter = new StringWriter();
StreamResult xmlOutput = new StreamResult(stringWriter);
TransformerFactory transformerFactory = TransformerFactory.newInstance();
transformerFactory.setAttribute("indent-number", "2");
Transformer transformer = transformerFactory.newTransformer();
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
transformer.transform(xmlInput, xmlOutput);
return xmlOutput.getWriter().toString();
} catch (TransformerConfigurationException e) {
return xmlString;
} catch (TransformerException e) {
return xmlString;
}
}
/**
* Converts json string to readable form
* @param jsonString json string
* @return formatted string
*/
public static String prettyPrintJson(final String jsonString) {
if (jsonString == null) {
return null;
}
Gson gson = new GsonBuilder().setPrettyPrinting().create();
JsonElement json = new JsonParser().parse(jsonString);
return gson.toJson(json);
}
/**
* Prints xml or json in pretty and readable format
* @param str xml or json string
* @return converted xml or json
*/
public static String prettyPrintXmlOrJson(final String str) {
if (str == null) {
return null;
}
String cleanStr = str.trim();
//taken from http://stackoverflow.com/questions/7256142/way-to-quickly-check-if-string-is-xml-or-json-in-c-sharp
if (cleanStr.startsWith("{") || cleanStr.startsWith("[")) {
return prettyPrintJson(cleanStr);
}
if (cleanStr.startsWith("<")) {
return prettyPrintXml(cleanStr);
}
LOGGER.warn("The string does not seem to be either json or xml: " + cleanStr);
return str;
}
/**
* Tries to get entity definition.
* @param cluster cluster where definition is stored
* @param entity entity for which definition is required
* @param shouldReturn should the definition be successfully retrieved or not
* @return entity definition
*/
public static String getEntityDefinition(ColoHelper cluster,
String entity,
boolean shouldReturn) throws
JAXBException,
IOException, URISyntaxException, AuthenticationException {
EntityType type = getEntityType(entity);
IEntityManagerHelper helper;
if (EntityType.PROCESS == type) {
helper = cluster.getProcessHelper();
} else if (EntityType.FEED == type) {
helper = cluster.getFeedHelper();
} else {
helper = cluster.getClusterHelper();
}
ServiceResponse response = helper.getEntityDefinition(entity);
if (shouldReturn) {
AssertUtil.assertSucceeded(response);
} else {
AssertUtil.assertFailed(response);
}
String result = response.getMessage();
Assert.assertNotNull(result);
return result;
}
}