FALCON-2199 Delete API support for extension job (user extension)
This pull request is dependent on https://github.com/apache/falcon/pull/331
Author: sandeep <sandysmdl@gmail.com>
Reviewers: @pallavi-rao
Closes #333 from sandeepSamudrala/FALCON-2199
diff --git a/client/src/main/java/org/apache/falcon/ExtensionHandler.java b/client/src/main/java/org/apache/falcon/ExtensionHandler.java
index 5e2f26d..f6f3346 100644
--- a/client/src/main/java/org/apache/falcon/ExtensionHandler.java
+++ b/client/src/main/java/org/apache/falcon/ExtensionHandler.java
@@ -54,7 +54,7 @@
private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir"));
private static final String LOCATION = "location";
private static final String TYPE = "type";
- private static final String NAME = "name";
+ private static final String NAME = "extensionName";
private static final String EXTENSION_BUILDER_INTERFACE_SERVICE_FILE =
"META-INF/services/org.apache.falcon.extensions.ExtensionBuilder";
@@ -185,8 +185,6 @@
for (File innerFile : files) {
if (innerFile.isFile()) {
urls.add(innerFile.toURI().toURL());
- } else {
- urls.addAll(getFilesInPath(file.toURI().toURL()));
}
}
}
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index 8cdbf30..e4ce993 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -203,7 +203,6 @@
* loadAndPrepare.
* @param configPath path to extension parameters.
* @return
- * @throws FalconCLIException
*/
public abstract APIResult submitExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser);
@@ -216,7 +215,6 @@
* loadAndPrepare.
* @param configPath path to extension parameters.
* @return
- * @throws FalconCLIException
*/
public abstract APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser);
@@ -227,11 +225,16 @@
* loadAndPrepare.
* @param configPath path to extension parameters.
* @return
- * @throws FalconCLIException
*/
public abstract APIResult updateExtensionJob(String jobName, String configPath, String doAsUser);
/**
+ * Deletes the entities that are part of the extension job and then deleted the job from the DB.
+ * @param jobName name of the extension job that needs to be deleted.
+ * @return APIResult status of the deletion query.
+ */
+ public abstract APIResult deleteExtensionJob(final String jobName, final String doAsUser);
+ /**
* Prepares set of entities the extension has implemented to validate the extension job.
* @param jobName job name of the extension job.
* @return
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
index 9fb0dd4..f1ed6f5 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
@@ -20,7 +20,11 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
+import org.apache.falcon.entity.EntityNotRegisteredException;
+import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.extensions.store.ExtensionStore;
import org.apache.falcon.persistence.ExtensionBean;
@@ -33,7 +37,11 @@
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
/**
* A base class for managing Extension Operations.
@@ -106,6 +114,28 @@
}
}
+ protected SortedMap<EntityType, List<Entity>> getJobEntities(ExtensionJobsBean extensionJobsBean)
+ throws FalconException, IOException {
+ TreeMap<EntityType, List<Entity>> entityMap = new TreeMap<>();
+ List<String> processes = extensionJobsBean.getProcesses();
+ List<String> feeds = extensionJobsBean.getFeeds();
+ entityMap.put(EntityType.PROCESS, getEntities(processes, EntityType.PROCESS));
+ entityMap.put(EntityType.FEED, getEntities(feeds, EntityType.FEED));
+ return entityMap;
+ }
+
+ private List<Entity> getEntities(List<String> entityNames, EntityType entityType) throws FalconException {
+ List<Entity> entities = new ArrayList<>();
+ for (String entityName : entityNames) {
+ try {
+ entities.add(EntityUtil.getEntity(entityType, entityName));
+ } catch (EntityNotRegisteredException e) {
+ LOG.error("Entity {} not found during deletion nothing to do", entityName);
+ }
+ }
+ return entities;
+ }
+
private JSONObject buildExtensionJobDetailResult(final String jobName) throws FalconException {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean jobsBean = metaStore.getExtensionJobDetails(jobName);
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java
index a07a6d4..ae0a61a 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java
@@ -20,6 +20,9 @@
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconRuntimException;
+import org.apache.falcon.FalconWebException;
+import org.apache.falcon.entity.EntityNotRegisteredException;
+import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.channel.Channel;
@@ -36,6 +39,9 @@
import static org.apache.falcon.resource.AbstractEntityManager.getApplicableColos;
import static org.apache.falcon.resource.proxy.SchedulableEntityManagerProxy.FALCON_TAG;
+/**
+ * Proxy Util class to proxy entity management apis from prism to servers.
+ */
class EntityProxyUtil {
private final Map<String, Channel> entityManagerChannels = new HashMap<>();
private final Map<String, Channel> configSyncChannels = new HashMap<>();
@@ -89,6 +95,31 @@
return results;
}
+ Map<String, APIResult> proxyDelete(final String type, final String entityName,
+ final HttpServletRequest bufferedRequest) {
+ Map<String, APIResult> results = new HashMap<>();
+ results.put(FALCON_TAG, new EntityProxy(type, entityName) {
+ @Override
+ public APIResult execute() {
+ try {
+ EntityUtil.getEntity(type, entityName);
+ return super.execute();
+ } catch (EntityNotRegisteredException e) {
+ return new APIResult(APIResult.Status.SUCCEEDED,
+ entityName + "(" + type + ") doesn't exist. Nothing to do");
+ } catch (FalconException e) {
+ throw FalconWebException.newAPIException(e);
+ }
+ }
+
+ @Override
+ protected APIResult doExecute(String colo) throws FalconException {
+ return getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entityName, colo);
+ }
+ }.execute());
+ return results;
+ }
+
Map<String, APIResult> proxyUpdate(final String type, final String entityName, final Boolean skipDryRun,
final HttpServletRequest bufferedRequest, Entity newEntity) {
final Set<String> oldColos = getApplicableColos(type, entityName);
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
index 0e79f12..551dbbf 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -96,7 +96,6 @@
private String currentColo = DeploymentUtil.getCurrentColo();
private EntityProxyUtil entityProxyUtil = new EntityProxyUtil();
-
private static final String EXTENSION_PROPERTY_JSON_SUFFIX = "-properties.json";
//SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
@GET
@@ -271,28 +270,26 @@
@Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
public APIResult delete(@PathParam("job-name") String jobName,
+ @Context HttpServletRequest request,
@DefaultValue("") @QueryParam("doAs") String doAsUser) {
checkIfExtensionServiceIsEnabled();
- try {
- List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
- if (entities.isEmpty()) {
- // return failure if the extension job doesn't exist
- return new APIResult(APIResult.Status.SUCCEEDED,
- "Extension job " + jobName + " doesn't exist. Nothing to delete.");
- }
+ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+ ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
+ if (extensionJobsBean == null) {
+ // return failure if the extension job doesn't exist
+ return new APIResult(APIResult.Status.SUCCEEDED,
+ "Extension job " + jobName + " doesn't exist. Nothing to delete.");
+ }
- for (Entity entity : entities) {
- // TODO(yzheng): need to remember the entity dependency graph for clean ordered removal
- canRemove(entity);
- if (entity.getEntityType().isSchedulable() && !DeploymentUtil.isPrism()) {
- getWorkflowEngine(entity).delete(entity);
- }
- configStore.remove(entity.getEntityType(), entity.getName());
- }
- } catch (FalconException | IOException e) {
+ SortedMap<EntityType, List<Entity>> entityMap;
+ try {
+ entityMap = getJobEntities(extensionJobsBean);
+ deleteEntities(entityMap, request);
+ } catch (FalconException | IOException | JAXBException e) {
LOG.error("Error when deleting extension job: " + jobName + ": ", e);
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
+ metaStore.deleteExtensionJob(jobName);
return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " deleted successfully");
}
@@ -423,6 +420,21 @@
return new BufferedRequest(request);
}
+ private void deleteEntities(SortedMap<EntityType, List<Entity>> entityMap, HttpServletRequest request)
+ throws IOException, JAXBException {
+ for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+ for (final Entity entity : entry.getValue()) {
+ final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request);
+ final String entityType = entity.getEntityType().toString();
+ final String entityName = entity.getName();
+ entityProxyUtil.proxyDelete(entityType, entityName, bufferedRequest);
+ if (!embeddedMode) {
+ super.delete(bufferedRequest, entityType, entityName, currentColo);
+ }
+ }
+ }
+ }
+
private void submitEntities(String extensionName, String jobName,
SortedMap<EntityType, List<Entity>> entityMap, InputStream configStream,
HttpServletRequest request) throws FalconException, IOException, JAXBException {
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 74a1acc..8f41c48 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -21,7 +21,6 @@
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
-import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
@@ -222,29 +221,8 @@
throw FalconWebException.newAPIException(e);
}
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
- Map<String, APIResult> results = new HashMap<String, APIResult>();
-
- results.put(FALCON_TAG, new EntityProxy(type, entityName) {
- @Override
- public APIResult execute() {
- try {
- EntityUtil.getEntity(type, entityName);
- return super.execute();
- } catch (EntityNotRegisteredException e) {
- return new APIResult(APIResult.Status.SUCCEEDED,
- entityName + "(" + type + ") doesn't exist. Nothing to do");
- } catch (FalconException e) {
- throw FalconWebException.newAPIException(e);
- }
- }
-
- @Override
- protected APIResult doExecute(String colo) throws FalconException {
- return entityProxyUtil.getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entityName,
- colo);
- }
- }.execute());
-
+ Map<String, APIResult> results = new HashMap<>();
+ results.putAll(entityProxyUtil.proxyDelete(type, entityName, bufferedRequest));
// delete only if deleted from everywhere
if (!embeddedMode && results.get(FALCON_TAG).getStatus() == APIResult.Status.SUCCEEDED) {
results.put(PRISM_TAG, super.delete(bufferedRequest, type, entityName, currentColo));
diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
index 23f4cf1..16830f9 100644
--- a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
+++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
@@ -116,11 +116,14 @@
if (entity.getEntityType() != EntityType.PROCESS){
return;
}
- backlogMetricStore.deleteEntityInstance(entity.getName());
- entityBacklogs.remove(entity);
- Process process = EntityUtil.getEntity(entity.getEntityType(), entity.getName());
- for(Cluster cluster : process.getClusters().getClusters()){
- dropMetric(cluster.getName(), process);
+ Process process = (Process) entity;
+ if (process.getSla() != null) {
+ backlogMetricStore.deleteEntityInstance(entity.getName());
+ entityBacklogs.remove(entity);
+ process = EntityUtil.getEntity(entity.getEntityType(), entity.getName());
+ for (Cluster cluster : process.getClusters().getClusters()) {
+ dropMetric(cluster.getName(), process);
+ }
}
}
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 66b8e9b..9ed2a0d 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -336,6 +336,7 @@
}
}
+ @Override
public APIResult updateExtensionJob(String jobName, String configPath, String doAsUser) {
InputStream configStream = getServletInputStream(configPath);
try {
@@ -344,11 +345,21 @@
return localExtensionManager.updateExtensionJob(extensionName, jobName, configStream,
entityMap);
} catch (FalconException | IOException e) {
- throw new FalconCLIException("Failed in updating the extension job " + jobName);
+ throw new FalconCLIException("Failed in updating the extension job:" + jobName);
}
}
@Override
+ public APIResult deleteExtensionJob(String jobName, String doAsUser) {
+ try {
+ return localExtensionManager.deleteExtensionJob(jobName);
+ } catch (FalconException | IOException e) {
+ throw new FalconCLIException("Failed to delete the extension job:" + jobName);
+ }
+ }
+
+
+ @Override
public APIResult getExtensionJobDetails(final String jobName) {
return localExtensionManager.getExtensionJobDetails(jobName);
}
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index 7002dc8..0412ef2 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -22,7 +22,9 @@
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.extensions.store.ExtensionStore;
+import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.AbstractExtensionManager;
import org.apache.falcon.security.CurrentUser;
@@ -86,6 +88,19 @@
return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName);
}
+ public APIResult deleteExtensionJob(String jobName) throws FalconException, IOException{
+ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+ ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
+ SortedMap<EntityType, List<Entity>> entityMap = getJobEntities(extensionJobsBean);
+ for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+ for (Entity entity : entry.getValue()) {
+ delete(entity.getEntityType().name(), entity.getName(), null);
+ }
+ }
+ ExtensionStore.getMetaStore().deleteExtensionJob(jobName);
+ return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " deleted successfully");
+ }
+
public APIResult updateExtensionJob(String extensionName, String jobName, InputStream configStream,
SortedMap<EntityType, List<Entity>> entityMap)
throws FalconException, IOException {
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index 690fdd5..9e836e7 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -245,6 +245,9 @@
return falconUnitClient.updateExtensionJob(jobName, configPath, doAsUser);
}
+ APIResult deleteExtensionJob(String jobName, String doAsUser) {
+ return falconUnitClient.deleteExtensionJob(jobName, doAsUser);
+ }
public static String overlayParametersOverTemplate(String template,
Map<String, String> overlay) throws IOException {
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 07d8195..a41743d 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -19,6 +19,7 @@
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
+import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Property;
@@ -72,6 +73,8 @@
private static final String EXTENSION_PATH = "/projects/falcon/extension/testExtension";
private static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources";
private static final String EXTENSION_PROPERTIES = "extension.properties";
+ private static final String TEST_JOB = "testJob";
+ private static final String TEST_EXTENSION = "testExtension";
private FileSystem fileSystem;
private static final String STORAGE_URL = "jail://global:00";
@@ -431,39 +434,55 @@
}
@Test
- public void testSubmitAndScheduleExtensionJob() throws Exception {
+ public void testExtensionJobOperations() throws Exception {
clearDB();
submitCluster();
createExtensionPackage();
String packageBuildLib = new Path(EXTENSION_PATH, "libs/build/").toString();
- String result = registerExtension("testExtension", STORAGE_URL + EXTENSION_PATH, "testExtension");
+ String result = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION);
Assert.assertEquals(result, "Extension :testExtension registered successfully.");
createDir(PROCESS_APP_PATH);
copyExtensionJar(packageBuildLib);
- APIResult apiResult = submitAndScheduleExtensionJob("testExtension", "testJob", null, null);
+ APIResult apiResult = submitAndScheduleExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
assertStatus(apiResult);
- result = getExtensionJobDetails("testJob");
+ result = getExtensionJobDetails(TEST_JOB);
JSONObject resultJson = new JSONObject(result);
- Assert.assertEquals(resultJson.get("extensionName"), "testExtension");
- Process process = (Process)getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
+ Assert.assertEquals(resultJson.get("extensionName"), TEST_EXTENSION);
+ Process process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
Assert.assertEquals(process.getPipelines(), "testPipeline");
apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false);
assertStatus(apiResult);
Assert.assertEquals(apiResult.getMessage(), "RUNNING");
- apiResult = updateExtensionJob("testJob", getAbsolutePath(EXTENSION_PROPERTIES), null);
+ apiResult = updateExtensionJob(TEST_JOB, getAbsolutePath(EXTENSION_PROPERTIES), null);
assertStatus(apiResult);
- String processes = new JSONObject(getExtensionJobDetails("testJob")).get("processes").toString();
+ String processes = new JSONObject(getExtensionJobDetails(TEST_JOB)).get("processes").toString();
Assert.assertEquals(processes, "sample");
- process = (Process)getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
+ process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
Assert.assertEquals(process.getPipelines(), "testSample");
apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false);
assertStatus(apiResult);
Assert.assertEquals(apiResult.getMessage(), "RUNNING");
+
+ apiResult = deleteExtensionJob(TEST_JOB, null);
+ assertStatus(apiResult);
+ try {
+ getEntity(EntityType.PROCESS, "sample");
+ Assert.fail("Should have thrown a validation exception");
+ } catch (EntityNotRegisteredException e) {
+ //Do nothing. Exception Expected
+ }
+ try {
+ getClient().getExtensionJobDetails(TEST_JOB);
+ Assert.fail("Should have thrown a FalconWebException");
+ } catch (FalconWebException e) {
+ Assert.assertEquals(((APIResult) e.getResponse().getEntity()).getMessage(), "Job name not found:testJob");
+ //Do nothing. Exception Expected.
+ }
}