blob: b547c31054517652e022ce5e221e4e8f985c553e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.oozie.process;
import org.apache.falcon.FalconException;
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
import org.apache.falcon.oozie.coordinator.CONFIGURATION;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.transform.stream.StreamSource;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
/**
* Base for falcon unit tests involving configuration store.
*/
public class AbstractTestBase {
protected Entity storeEntity(EntityType type, String name, String resource, String writeEndpoint) throws Exception {
Unmarshaller unmarshaller = type.getUnmarshaller();
ConfigurationStore store = ConfigurationStore.get();
switch (type) {
case CLUSTER:
Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(resource));
if (name != null){
store.remove(type, name);
cluster.setName(name);
}
store.publish(type, cluster);
if (writeEndpoint != null) {
ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(writeEndpoint);
FileSystem fs = new Path(writeEndpoint).getFileSystem(EmbeddedCluster.newConfiguration());
fs.create(
new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/retention/ext.jar")).close();
fs.create(
new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/replication/ext.jar")).close();
}
return cluster;
case FEED:
Feed feed = (Feed) unmarshaller.unmarshal(this.getClass().getResource(resource));
if (name != null) {
store.remove(type, name);
feed.setName(name);
}
store.publish(type, feed);
return feed;
case PROCESS:
Process process = (Process) unmarshaller.unmarshal(this.getClass().getResource(resource));
if (name != null) {
store.remove(type, name);
process.setName(name);
}
store.publish(type, process);
return process;
default:
}
throw new IllegalArgumentException("Unhandled type: " + type);
}
protected COORDINATORAPP getCoordinator(FileSystem fs, Path path) throws Exception {
String coordStr = readFile(fs, path);
Unmarshaller unmarshaller = JAXBContext.newInstance(COORDINATORAPP.class).createUnmarshaller();
SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-coordinator-0.3.xsd"));
unmarshaller.setSchema(schema);
JAXBElement<COORDINATORAPP> jaxbBundle = unmarshaller.unmarshal(
new StreamSource(new ByteArrayInputStream(coordStr.trim().getBytes())), COORDINATORAPP.class);
return jaxbBundle.getValue();
}
protected BUNDLEAPP getBundle(FileSystem fs, Path path) throws Exception {
String bundleStr = readFile(fs, new Path(path, "bundle.xml"));
Unmarshaller unmarshaller = JAXBContext.newInstance(BUNDLEAPP.class).createUnmarshaller();
SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-bundle-0.1.xsd"));
unmarshaller.setSchema(schema);
JAXBElement<BUNDLEAPP> jaxbBundle = unmarshaller.unmarshal(
new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), BUNDLEAPP.class);
return jaxbBundle.getValue();
}
protected String readFile(FileSystem fs, Path path) throws Exception {
BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
String line;
StringBuilder contents = new StringBuilder();
while ((line = reader.readLine()) != null) {
contents.append(line);
}
return contents.toString();
}
protected void cleanupStore() throws FalconException {
ConfigurationStore store = ConfigurationStore.get();
for (EntityType type : EntityType.values()) {
Collection<String> entities = store.getEntities(type);
for (String entity : entities) {
store.remove(type, entity);
}
}
}
protected void assertLibExtensions(FileSystem fs, COORDINATORAPP coord, EntityType type,
String lifecycle) throws Exception {
WORKFLOWAPP wf = getWorkflowapp(fs, coord);
List<Object> actions = wf.getDecisionOrForkOrJoin();
String lifeCyclePath = lifecycle == null ? "" : "/" + lifecycle;
for (Object obj : actions) {
if (!(obj instanceof ACTION)) {
continue;
}
ACTION action = (ACTION) obj;
List<String> files = null;
if (action.getJava() != null) {
files = action.getJava().getFile();
} else if (action.getPig() != null) {
files = action.getPig().getFile();
} else if (action.getMapReduce() != null) {
files = action.getMapReduce().getFile();
}
if (files != null) {
Assert.assertTrue(files.get(files.size() - 1).endsWith(
"/projects/falcon/working/libext/" + type.name() + lifeCyclePath + "/ext.jar"));
}
}
}
@SuppressWarnings("unchecked")
protected WORKFLOWAPP getWorkflowapp(FileSystem fs, COORDINATORAPP coord) throws JAXBException, IOException {
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
return getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
}
@SuppressWarnings("unchecked")
protected WORKFLOWAPP getWorkflowapp(FileSystem fs, Path path) throws JAXBException, IOException {
JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
return ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(fs.open(path))).getValue();
}
protected ACTION getAction(WORKFLOWAPP wf, String name) {
for (Object action : wf.getDecisionOrForkOrJoin()) {
if (action instanceof ACTION && ((ACTION) action).getName().equals(name)) {
return (ACTION) action;
}
}
throw new IllegalArgumentException("Invalid action name " + name);
}
protected void assertAction(WORKFLOWAPP wf, String name, boolean assertRetry) {
ACTION action = getAction(wf, name);
Assert.assertNotNull(action);
if (assertRetry) {
Assert.assertEquals(action.getRetryMax(), "3");
Assert.assertEquals(action.getRetryInterval(), "1");
}
}
protected HashMap<String, String> getCoordProperties(COORDINATORAPP coord) {
HashMap<String, String> props = new HashMap<String, String>();
for (CONFIGURATION.Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
props.put(prop.getName(), prop.getValue());
}
return props;
}
protected void verifyEntityProperties(Entity entity, Cluster cluster,
WorkflowExecutionContext.EntityOperations operation,
HashMap<String, String> props) throws Exception {
Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()),
entity.getName());
Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()),
entity.getEntityType().name());
Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity));
Assert.assertEquals(props.get("falconDataOperation"), operation.name());
}
private String getLogPath(Cluster cluster, Entity entity) {
Path logPath = EntityUtil.getLogPath(cluster, entity);
return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;
}
protected void verifyBrokerProperties(Cluster cluster, HashMap<String, String> props) {
Assert.assertEquals(props.get(WorkflowExecutionArgs.USER_BRKR_URL.getName()),
ClusterHelper.getMessageBrokerUrl(cluster));
Assert.assertEquals(props.get(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName()),
ClusterHelper.getMessageBrokerImplClass(cluster));
String falconBrokerUrl = StartupProperties.get().getProperty(
"broker.url", "tcp://localhost:61616?daemon=true");
Assert.assertEquals(props.get(WorkflowExecutionArgs.BRKR_URL.getName()), falconBrokerUrl);
String falconBrokerImplClass = StartupProperties.get().getProperty(
"broker.impl.class", ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
Assert.assertEquals(props.get(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName()),
falconBrokerImplClass);
String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
String.valueOf(3 * 24 * 60L));
Assert.assertEquals(props.get(WorkflowExecutionArgs.BRKR_TTL.getName()), jmsMessageTTL);
}
}