| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.falcon.resource; |
| |
| import com.sun.jersey.api.client.Client; |
| import com.sun.jersey.api.client.ClientResponse; |
| import com.sun.jersey.api.client.WebResource; |
| import com.sun.jersey.api.client.config.ClientConfig; |
| import com.sun.jersey.api.client.config.DefaultClientConfig; |
| import org.apache.commons.lang.RandomStringUtils; |
| import org.apache.falcon.FalconException; |
| import org.apache.falcon.cluster.util.EmbeddedCluster; |
| import org.apache.falcon.cluster.util.StandAloneCluster; |
| import org.apache.falcon.entity.store.ConfigurationStore; |
| import org.apache.falcon.entity.v0.EntityType; |
| import org.apache.falcon.entity.v0.SchemaHelper; |
| import org.apache.falcon.entity.v0.cluster.Cluster; |
| import org.apache.falcon.entity.v0.feed.Feed; |
| import org.apache.falcon.util.StartupProperties; |
| import org.apache.falcon.workflow.engine.OozieClientFactory; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.oozie.client.BundleJob; |
| import org.apache.oozie.client.CoordinatorJob; |
| import org.apache.oozie.client.Job; |
| import org.apache.oozie.client.Job.Status; |
| import org.apache.oozie.client.OozieClient; |
| import org.apache.oozie.client.WorkflowJob; |
| import org.testng.Assert; |
| |
| import javax.servlet.ServletInputStream; |
| import javax.ws.rs.core.MediaType; |
| import javax.ws.rs.core.Response; |
| import javax.ws.rs.core.UriBuilder; |
| import javax.xml.bind.JAXBContext; |
| import javax.xml.bind.JAXBException; |
| import javax.xml.bind.Marshaller; |
| import javax.xml.bind.Unmarshaller; |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.io.FileReader; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.OutputStream; |
| import java.io.StringReader; |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| /** |
| * Base test class for CLI, Entity and Process Instances. |
| */ |
| public class TestContext { |
| public static final String FEED_TEMPLATE1 = "/feed-template1.xml"; |
| public static final String FEED_TEMPLATE2 = "/feed-template2.xml"; |
| |
| public static final String CLUSTER_TEMPLATE = "/cluster-template.xml"; |
| |
| public static final String SAMPLE_PROCESS_XML = "/process-version-0.xml"; |
| public static final String PROCESS_TEMPLATE = "/process-template.xml"; |
| |
| public static final String BASE_URL = "http://localhost:41000/falcon-webapp"; |
| public static final String REMOTE_USER = System.getProperty("user.name"); |
| |
| protected Unmarshaller unmarshaller; |
| protected Marshaller marshaller; |
| |
| protected EmbeddedCluster cluster; |
| protected WebResource service = null; |
| protected String clusterName; |
| protected String processName; |
| protected String outputFeedName; |
| |
| public static final Pattern VAR_PATTERN = Pattern.compile("##[A-Za-z0-9_]*##"); |
| |
| public Unmarshaller getUnmarshaller() { |
| return unmarshaller; |
| } |
| |
| public Marshaller getMarshaller() { |
| return marshaller; |
| } |
| |
| public EmbeddedCluster getCluster() { |
| return cluster; |
| } |
| |
| public WebResource getService() { |
| return service; |
| } |
| |
| public String getClusterName() { |
| return clusterName; |
| } |
| |
| public String getProcessName() { |
| return processName; |
| } |
| |
| public String getOutputFeedName() { |
| return outputFeedName; |
| } |
| |
| public String getClusterFileTemplate() { |
| return CLUSTER_TEMPLATE; |
| } |
| |
| public void scheduleProcess(String processTemplate, Map<String, String> overlay) throws Exception { |
| ClientResponse response = submitToFalcon(CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER); |
| assertSuccessful(response); |
| |
| response = submitToFalcon(FEED_TEMPLATE1, overlay, EntityType.FEED); |
| assertSuccessful(response); |
| |
| response = submitToFalcon(FEED_TEMPLATE2, overlay, EntityType.FEED); |
| assertSuccessful(response); |
| |
| response = submitToFalcon(processTemplate, overlay, EntityType.PROCESS); |
| assertSuccessful(response); |
| ClientResponse clientRepsonse = this.service.path("api/entities/schedule/process/" + processName) |
| .header("Remote-User", REMOTE_USER).accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML).post( |
| ClientResponse.class); |
| assertSuccessful(clientRepsonse); |
| } |
| |
| public void scheduleProcess() throws Exception { |
| scheduleProcess(PROCESS_TEMPLATE, getUniqueOverlay()); |
| } |
| |
| private List<WorkflowJob> getRunningJobs(String entityName) throws Exception { |
| OozieClient ozClient = OozieClientFactory.get(cluster.getCluster()); |
| StringBuilder builder = new StringBuilder(); |
| builder.append(OozieClient.FILTER_STATUS).append('=').append(Job.Status.RUNNING).append(';'); |
| builder.append(OozieClient.FILTER_NAME).append('=').append("FALCON_PROCESS_DEFAULT_").append(entityName); |
| return ozClient.getJobsInfo(builder.toString()); |
| } |
| |
| public void waitForWorkflowStart(String entityName) throws Exception { |
| for (int i = 0; i < 10; i++) { |
| List<WorkflowJob> jobs = getRunningJobs(entityName); |
| if (jobs != null && !jobs.isEmpty()) { |
| return; |
| } |
| |
| System.out.println("Waiting for workflow to start"); |
| Thread.sleep(i * 1000); |
| } |
| throw new Exception("Workflow for " + entityName + " hasn't started in oozie"); |
| } |
| |
| public void waitForProcessWFtoStart() throws Exception { |
| waitForWorkflowStart(processName); |
| } |
| |
| public void waitForOutputFeedWFtoStart() throws Exception { |
| waitForWorkflowStart(outputFeedName); |
| } |
| |
| public void waitForBundleStart(Status status) throws Exception { |
| OozieClient ozClient = OozieClientFactory.get(cluster.getCluster()); |
| List<BundleJob> bundles = getBundles(); |
| if (bundles.isEmpty()) { |
| return; |
| } |
| |
| String bundleId = bundles.get(0).getId(); |
| for (int i = 0; i < 15; i++) { |
| Thread.sleep(i * 1000); |
| BundleJob bundle = ozClient.getBundleJobInfo(bundleId); |
| if (bundle.getStatus() == status) { |
| if (status == Status.FAILED) { |
| return; |
| } |
| |
| boolean done = false; |
| for (CoordinatorJob coord : bundle.getCoordinators()) { |
| if (coord.getStatus() == status) { |
| done = true; |
| } |
| } |
| if (done) { |
| return; |
| } |
| } |
| System.out.println("Waiting for bundle " + bundleId + " in " + status + " state"); |
| } |
| throw new Exception("Bundle " + bundleId + " is not " + status + " in oozie"); |
| } |
| |
| public TestContext() { |
| try { |
| JAXBContext jaxbContext = JAXBContext.newInstance(APIResult.class, Feed.class, Process.class, Cluster.class, |
| InstancesResult.class); |
| unmarshaller = jaxbContext.createUnmarshaller(); |
| marshaller = jaxbContext.createMarshaller(); |
| |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| configure(); |
| } |
| |
| public void configure() { |
| StartupProperties.get().setProperty( |
| "application.services", |
| StartupProperties.get().getProperty("application.services") |
| .replace("org.apache.falcon.service.ProcessSubscriberService", "")); |
| String store = StartupProperties.get().getProperty("config.store.uri"); |
| StartupProperties.get().setProperty("config.store.uri", store + System.currentTimeMillis()); |
| ClientConfig config = new DefaultClientConfig(); |
| Client client = Client.create(config); |
| this.service = client.resource(UriBuilder.fromUri(BASE_URL).build()); |
| } |
| |
| public void setCluster(String file) throws Exception { |
| cluster = StandAloneCluster.newCluster(file); |
| clusterName = cluster.getCluster().getName(); |
| } |
| |
| /** |
| * Converts a InputStream into ServletInputStream. |
| * |
| * @param fileName |
| * @return ServletInputStream |
| * @throws java.io.IOException |
| */ |
| public ServletInputStream getServletInputStream(String fileName) throws IOException { |
| return getServletInputStream(new FileInputStream(fileName)); |
| } |
| |
| public ServletInputStream getServletInputStream(final InputStream stream) { |
| return new ServletInputStream() { |
| |
| @Override |
| public int read() throws IOException { |
| return stream.read(); |
| } |
| }; |
| } |
| |
| public ClientResponse submitAndSchedule(String template, Map<String, String> overlay, EntityType entityType) |
| throws Exception { |
| String tmpFile = overlayParametersOverTemplate(template, overlay); |
| ServletInputStream rawlogStream = getServletInputStream(tmpFile); |
| |
| return this.service.path("api/entities/submitAndSchedule/" + entityType.name().toLowerCase()) |
| .header("Remote-User", "testuser").accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML) |
| .post(ClientResponse.class, rawlogStream); |
| } |
| |
| public ClientResponse submitToFalcon(String template, Map<String, String> overlay, EntityType entityType) |
| throws IOException { |
| String tmpFile = overlayParametersOverTemplate(template, overlay); |
| if (entityType == EntityType.CLUSTER) { |
| try { |
| cluster = StandAloneCluster.newCluster(tmpFile); |
| clusterName = cluster.getCluster().getName(); |
| } catch (Exception e) { |
| throw new IOException("Unable to setup cluster info", e); |
| } |
| } |
| return submitFileToFalcon(entityType, tmpFile); |
| } |
| |
| private ClientResponse submitFileToFalcon(EntityType entityType, String tmpFile) throws IOException { |
| |
| ServletInputStream rawlogStream = getServletInputStream(tmpFile); |
| |
| return this.service.path("api/entities/submit/" + entityType.name().toLowerCase()).header("Remote-User", |
| "testuser") |
| .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML).post(ClientResponse.class, rawlogStream); |
| } |
| |
| public void assertRequestId(ClientResponse clientRepsonse) { |
| String response = clientRepsonse.getEntity(String.class); |
| try { |
| APIResult result = (APIResult) unmarshaller.unmarshal(new StringReader(response)); |
| Assert.assertNotNull(result.getRequestId()); |
| } catch (JAXBException e) { |
| Assert.fail("Reponse " + response + " is not valid"); |
| } |
| } |
| |
| public void assertStatus(ClientResponse clientRepsonse, APIResult.Status status) { |
| String response = clientRepsonse.getEntity(String.class); |
| try { |
| APIResult result = (APIResult) unmarshaller.unmarshal(new StringReader(response)); |
| Assert.assertEquals(result.getStatus(), status); |
| } catch (JAXBException e) { |
| Assert.fail("Reponse " + response + " is not valid"); |
| } |
| } |
| |
| public void assertFailure(ClientResponse clientRepsonse) { |
| Assert.assertEquals(clientRepsonse.getStatus(), Response.Status.BAD_REQUEST.getStatusCode()); |
| assertStatus(clientRepsonse, APIResult.Status.FAILED); |
| } |
| |
| public void assertSuccessful(ClientResponse clientRepsonse) { |
| Assert.assertEquals(clientRepsonse.getStatus(), Response.Status.OK.getStatusCode()); |
| assertStatus(clientRepsonse, APIResult.Status.SUCCEEDED); |
| } |
| |
| public String overlayParametersOverTemplate(String template, Map<String, String> overlay) throws IOException { |
| File tmpFile = getTempFile(); |
| OutputStream out = new FileOutputStream(tmpFile); |
| |
| InputStreamReader in; |
| if (getClass().getResourceAsStream(template) == null) { |
| in = new FileReader(template); |
| } else { |
| in = new InputStreamReader(getClass().getResourceAsStream(template)); |
| } |
| BufferedReader reader = new BufferedReader(in); |
| String line; |
| while ((line = reader.readLine()) != null) { |
| Matcher matcher = VAR_PATTERN.matcher(line); |
| while (matcher.find()) { |
| String variable = line.substring(matcher.start(), matcher.end()); |
| line = line.replace(variable, overlay.get(variable.substring(2, variable.length() - 2))); |
| matcher = VAR_PATTERN.matcher(line); |
| } |
| out.write(line.getBytes()); |
| out.write("\n".getBytes()); |
| } |
| reader.close(); |
| out.close(); |
| return tmpFile.getAbsolutePath(); |
| } |
| |
| public File getTempFile() throws IOException { |
| File target = new File("webapp/target"); |
| if (!target.exists()) { |
| target = new File("target"); |
| } |
| |
| return File.createTempFile("test", ".xml", target); |
| } |
| |
| public List<BundleJob> getBundles() throws Exception { |
| List<BundleJob> bundles = new ArrayList<BundleJob>(); |
| if (clusterName == null) { |
| return bundles; |
| } |
| |
| OozieClient ozClient = OozieClientFactory.get(cluster.getCluster()); |
| return ozClient.getBundleJobsInfo("name=FALCON_PROCESS_" + processName, 0, 10); |
| } |
| |
| public boolean killOozieJobs() throws Exception { |
| if (cluster == null) { |
| return true; |
| } |
| |
| OozieClient ozClient = OozieClientFactory.get(cluster.getCluster()); |
| List<BundleJob> bundles = getBundles(); |
| if (bundles != null) { |
| for (BundleJob bundle : bundles) { |
| ozClient.kill(bundle.getId()); |
| } |
| } |
| return false; |
| } |
| |
| public Map<String, String> getUniqueOverlay() throws FalconException { |
| Map<String, String> overlay = new HashMap<String, String>(); |
| long time = System.currentTimeMillis(); |
| clusterName = "cluster" + time; |
| overlay.put("cluster", clusterName); |
| overlay.put("inputFeedName", "in" + time); |
| //only feeds with future dates can be scheduled |
| Date endDate = new Date(System.currentTimeMillis() + 15 * 60 * 1000); |
| overlay.put("feedEndDate", SchemaHelper.formatDateUTC(endDate)); |
| overlay.put("outputFeedName", "out" + time); |
| processName = "p" + time; |
| overlay.put("processName", processName); |
| outputFeedName = "out" + time; |
| return overlay; |
| } |
| |
| public static void prepare() throws Exception { |
| |
| Map<String, String> overlay = new HashMap<String, String>(); |
| overlay.put("cluster", RandomStringUtils.randomAlphabetic(5)); |
| TestContext context = new TestContext(); |
| String file = context. |
| overlayParametersOverTemplate(context.CLUSTER_TEMPLATE, overlay); |
| EmbeddedCluster cluster = StandAloneCluster.newCluster(file); |
| |
| cleanupStore(); |
| |
| // setup dependent workflow and lipath in hdfs |
| FileSystem fs = FileSystem.get(cluster.getConf()); |
| fs.mkdirs(new Path("/falcon"), new FsPermission((short) 511)); |
| |
| Path wfParent = new Path("/falcon/test"); |
| fs.delete(wfParent, true); |
| Path wfPath = new Path(wfParent, "workflow"); |
| fs.mkdirs(wfPath); |
| fs.copyFromLocalFile(false, true, new Path(TestContext.class.getResource("/fs-workflow.xml").getPath()), |
| new Path(wfPath, |
| "workflow.xml")); |
| fs.mkdirs(new Path(wfParent, "input/2012/04/20/00")); |
| Path outPath = new Path(wfParent, "output"); |
| fs.mkdirs(outPath); |
| fs.setPermission(outPath, new FsPermission((short) 511)); |
| } |
| |
| public static void cleanupStore() throws Exception { |
| for (EntityType type : EntityType.values()) { |
| for (String name : ConfigurationStore.get().getEntities(type)) { |
| ConfigurationStore.get().remove(type, name); |
| } |
| } |
| } |
| } |