blob: ec85f13c25446f6eb0563f6bf2efffbf25a65ae0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import junit.framework.TestCase;
import org.apache.ambari.eventdb.model.WorkflowContext;
import org.apache.ambari.eventdb.model.WorkflowDag.WorkflowDagEntry;
import org.apache.ambari.log4j.hadoop.mapreduce.jobhistory.MapReduceJobHistoryUpdater;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobHistory;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.tools.rumen.JobSubmittedEvent;
import org.apache.hadoop.util.StringUtils;
/**
*
*/
public class TestJobHistoryParsing extends TestCase {
static final char LINE_DELIMITER_CHAR = '.';
static final char[] charsToEscape = new char[] {'"', '=', LINE_DELIMITER_CHAR};
private static final char DELIMITER = ' ';
private static final String ID = "WORKFLOW_ID";
private static final String NAME = "WORKFLOW_NAME";
private static final String NODE = "WORKFLOW_NODE_NAME";
private static final String ADJ = "WORKFLOW_ADJACENCIES";
private static final String ID_PROP = "mapreduce.workflow.id";
private static final String NAME_PROP = "mapreduce.workflow.name";
private static final String NODE_PROP = "mapreduce.workflow.node.name";
private static final String ADJ_PROP = "mapreduce.workflow.adjacency";
public void test1() {
Map<String,String[]> adj = new HashMap<String,String[]>();
adj.put("10", new String[] {"20", "30"});
adj.put("20", new String[] {"30"});
adj.put("30", new String[] {});
test("id_0-1", "something.name", "10", adj);
}
public void test2() {
Map<String,String[]> adj = new HashMap<String,String[]>();
adj.put("1=0", new String[] {"2 0", "3\"0."});
adj.put("2 0", new String[] {"3\"0."});
adj.put("3\"0.", new String[] {});
test("id_= 0-1", "something.name", "1=0", adj);
}
public void test3() {
String s = "`~!@#$%^&*()-_=+[]{}|,.<>/?;:'\"";
test(s, s, s, new HashMap<String,String[]>());
}
public void test4() {
Map<String,String[]> adj = new HashMap<String,String[]>();
adj.put("X", new String[] {});
test("", "jobName", "X", adj);
}
public void test(String workflowId, String workflowName, String workflowNodeName, Map<String,String[]> adjacencies) {
Configuration conf = new Configuration();
setProperties(conf, workflowId, workflowName, workflowNodeName, adjacencies);
String log = log("JOB", new String[] {ID, NAME, NODE, ADJ},
new String[] {conf.get(ID_PROP), conf.get(NAME_PROP), conf.get(NODE_PROP), JobHistory.JobInfo.getWorkflowAdjacencies(conf)});
ParsedLine line = new ParsedLine(log);
JobID jobid = new JobID("id", 1);
JobSubmittedEvent event = new JobSubmittedEvent(jobid, workflowName, "", 0l, "", null, "", line.get(ID), line.get(NAME), line.get(NODE), line.get(ADJ));
WorkflowContext context = MapReduceJobHistoryUpdater.buildWorkflowContext(event);
String resultingWorkflowId = workflowId;
if (workflowId.isEmpty())
resultingWorkflowId = jobid.toString().replace("job_", "mr_");
assertEquals("Didn't recover workflowId", resultingWorkflowId, context.getWorkflowId());
assertEquals("Didn't recover workflowName", workflowName, context.getWorkflowName());
assertEquals("Didn't recover workflowNodeName", workflowNodeName, context.getWorkflowEntityName());
Map<String,String[]> resultingAdjacencies = adjacencies;
if (resultingAdjacencies.size() == 0) {
resultingAdjacencies = new HashMap<String,String[]>();
resultingAdjacencies.put(workflowNodeName, new String[] {});
}
assertEquals("Got incorrect number of adjacencies", resultingAdjacencies.size(), context.getWorkflowDag().getEntries().size());
for (WorkflowDagEntry entry : context.getWorkflowDag().getEntries()) {
String[] sTargets = resultingAdjacencies.get(entry.getSource());
assertNotNull("No original targets for " + entry.getSource(), sTargets);
List<String> dTargets = entry.getTargets();
assertEquals("Got incorrect number of targets for " + entry.getSource(), sTargets.length, dTargets.size());
for (int i = 0; i < sTargets.length; i++) {
assertEquals("Got incorrect target for " + entry.getSource(), sTargets[i], dTargets.get(i));
}
}
}
private static void setProperties(Configuration conf, String workflowId, String workflowName, String workflowNodeName, Map<String,String[]> adj) {
conf.set(ID_PROP, workflowId);
conf.set(NAME_PROP, workflowName);
conf.set(NODE_PROP, workflowNodeName);
for (Entry<String,String[]> entry : adj.entrySet()) {
conf.setStrings(ADJ_PROP + "." + entry.getKey(), entry.getValue());
}
}
private static String log(String recordType, String[] keys, String[] values) {
int length = recordType.length() + keys.length * 4 + 2;
for (int i = 0; i < keys.length; i++) {
values[i] = StringUtils.escapeString(values[i], StringUtils.ESCAPE_CHAR, charsToEscape);
length += values[i].length() + keys[i].toString().length();
}
// We have the length of the buffer, now construct it.
StringBuilder builder = new StringBuilder(length);
builder.append(recordType);
builder.append(DELIMITER);
for (int i = 0; i < keys.length; i++) {
builder.append(keys[i]);
builder.append("=\"");
builder.append(values[i]);
builder.append("\"");
builder.append(DELIMITER);
}
builder.append(LINE_DELIMITER_CHAR);
return builder.toString();
}
private static class ParsedLine {
static final String KEY = "(\\w+)";
static final String VALUE = "([^\"\\\\]*+(?:\\\\.[^\"\\\\]*+)*+)";
static final Pattern keyValPair = Pattern.compile(KEY + "=" + "\"" + VALUE + "\"");
Map<String,String> props = new HashMap<String,String>();
private String type;
ParsedLine(String fullLine) {
int firstSpace = fullLine.indexOf(" ");
if (firstSpace < 0) {
firstSpace = fullLine.length();
}
if (firstSpace == 0) {
return; // This is a junk line of some sort
}
type = fullLine.substring(0, firstSpace);
String propValPairs = fullLine.substring(firstSpace + 1);
Matcher matcher = keyValPair.matcher(propValPairs);
while (matcher.find()) {
String key = matcher.group(1);
String value = matcher.group(2);
props.put(key, value);
}
}
protected String getType() {
return type;
}
protected String get(String key) {
return props.get(key);
}
}
}