FALCON-2231 Changes to support Schedule of user extensions
Author: sandeep <sandysmdl@gmail.com>
Reviewers: @pallavi,@pracheer,@praveen
Closes #334 from sandeepSamudrala/FALCON-2231 and squashes the following commits:
d32bf98 [sandeep] FALCON-2231 Fixed checkstyle issues.
2bbd7e2 [sandeep] FALCON-2231 Incoporated review comments and fixed test cases
2269806 [sandeep] FALCON-2231 Incoporated review comments and small fixes for duplicate submission and colo addition to schedule command
44d6f2a [sandeep] FALCON-2231 Corrected message in LocalExtensionManager
f165282 [sandeep] FALCON-2231 Updated Error messages and throwing out exception in case of extension not found while scheduling
96a9a1d [sandeep] FALCON-2231 Rebased my patch
ca320e0 [sandeep] FACLON-2231 Changes to support Schedule of user extensions
53831ea [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2231
cc28658 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
089b10d [sandeep] Merge branch 'master' of https://github.com/apache/falcon
456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon
b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon
194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon
e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
index a8aea52..dcac8e8 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
@@ -369,7 +369,7 @@
OUT.get().println(result);
}
- private void validateColo(Set<String> optionsList) {
+ static void validateColo(Set<String> optionsList) {
if (optionsList.contains(FalconCLIConstants.COLO_OPT)) {
throw new FalconCLIException("Invalid argument : " + FalconCLIConstants.COLO_OPT);
}
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
index 0343aa8..2a105dc 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
@@ -33,38 +33,44 @@
import org.apache.falcon.resource.ExtensionInstanceList;
import org.apache.falcon.resource.ExtensionJobList;
+import java.io.IOException;
import java.io.PrintStream;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.falcon.cli.FalconEntityCLI.validateColo;
+import static org.apache.falcon.client.FalconCLIConstants.COLO_OPT;
+import static org.apache.falcon.client.FalconCLIConstants.COLO_OPT_DESCRIPTION;
+
/**
* Falcon extensions Command Line Interface - wraps the RESTful API for extensions.
*/
-public class FalconExtensionCLI {
+public class FalconExtensionCLI extends FalconCLI{
public static final AtomicReference<PrintStream> OUT = new AtomicReference<>(System.out);
// Extension commands
- public static final String ENUMERATE_OPT = "enumerate";
- public static final String DEFINITION_OPT = "definition";
- public static final String DESCRIBE_OPT = "describe";
- public static final String INSTANCES_OPT = "instances";
- public static final String UNREGISTER_OPT = "unregister";
- public static final String DETAIL_OPT = "detail";
- public static final String REGISTER_OPT = "register";
- public static final String ENABLE_OPT = "enable";
- public static final String DISABLE_OPT = "disable";
+ private static final String ENUMERATE_OPT = "enumerate";
+ private static final String DEFINITION_OPT = "definition";
+ private static final String DESCRIBE_OPT = "describe";
+ private static final String INSTANCES_OPT = "instances";
+ private static final String UNREGISTER_OPT = "unregister";
+ private static final String DETAIL_OPT = "detail";
+ private static final String REGISTER_OPT = "register";
+ private static final String ENABLE_OPT = "enable";
+ private static final String DISABLE_OPT = "disable";
// Input parameters
- public static final String EXTENSION_NAME_OPT = "extensionName";
- public static final String JOB_NAME_OPT = "jobName";
+ private static final String EXTENSION_NAME_OPT = "extensionName";
+ private static final String JOB_NAME_OPT = "jobName";
public static final String DESCRIPTION = "description";
- public static final String PATH = "path";
+ private static final String PATH = "path";
- public FalconExtensionCLI() {
+ FalconExtensionCLI() throws Exception {
+ super();
}
- public void extensionCommand(CommandLine commandLine, FalconClient client) {
+ void extensionCommand(CommandLine commandLine, FalconClient client) throws IOException {
Set<String> optionsList = new HashSet<>();
for (Option option : commandLine.getOptions()) {
optionsList.add(option.getOpt());
@@ -77,6 +83,8 @@
String doAsUser = commandLine.getOptionValue(FalconCLIConstants.DO_AS_OPT);
String path = commandLine.getOptionValue(FalconCLIConstants.PATH);
String description = commandLine.getOptionValue(FalconCLIConstants.DESCRIPTION);
+ String colo = commandLine.getOptionValue(FalconCLIConstants.COLO_OPT);
+ colo = getColo(colo);
if (optionsList.contains(ENUMERATE_OPT)) {
result = client.enumerateExtensions().getMessage();
@@ -105,6 +113,7 @@
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
+ validateColo(optionsList);
result = client.submitExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(REGISTER_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
@@ -114,6 +123,7 @@
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
+ validateColo(optionsList);
result = client.submitAndScheduleExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
@@ -125,7 +135,8 @@
result = client.validateExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
- result = client.scheduleExtensionJob(jobName, doAsUser).getMessage();
+ colo = getColo(colo);
+ result = client.scheduleExtensionJob(jobName, colo, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SUSPEND_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.suspendExtensionJob(jobName, doAsUser).getMessage();
@@ -170,7 +181,7 @@
OUT.get().println(result);
}
- public Options createExtensionOptions() {
+ Options createExtensionOptions() {
Options extensionOptions = new Options();
Option enumerate = new Option(ENUMERATE_OPT, false, "Enumerate all extensions");
@@ -192,6 +203,8 @@
Option detail = new Option(FalconCLIConstants.DETAIL, false, "Show details of a given extension");
Option register = new Option(FalconCLIConstants.REGISTER, false, "Register an extension with Falcon. This will "
+ "make the extension available for instantiation for all users.");
+ Option colo = new Option(COLO_OPT, true, COLO_OPT_DESCRIPTION);
+ colo.setRequired(false);
OptionGroup group = new OptionGroup();
group.addOption(enumerate);
@@ -249,6 +262,7 @@
extensionOptions.addOption(filePath);
extensionOptions.addOption(path);
extensionOptions.addOption(description);
+ extensionOptions.addOption(colo);
return extensionOptions;
}
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index 3181b64..7b8a606 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -222,6 +222,13 @@
String doAsUser);
/**
+ * Schedules the set of entities that are part of the extension.
+ * @param jobName extensionJob that needs to be scheduled.
+ * @return APIResult stating status of scheduling the extension.
+ */
+ public abstract APIResult scheduleExtensionJob(String jobName, String coloExpr, String doAsUser);
+
+ /**
* Prepares set of entities the extension has implemented and stage them to a local directory and submits and
* schedules them.
* @param extensionName extension which is available in the store.
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 8401c9c..2772085 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -1200,9 +1200,10 @@
}
}
- public APIResult scheduleExtensionJob(final String jobName, final String doAsUser) {
+ public APIResult scheduleExtensionJob(String jobName, final String coloExpr, final String doAsUser) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.SCHEDULE.path, jobName)
+ .addQueryParam(COLO, coloExpr)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.SCHEDULE);
return getResponse(APIResult.class, clientResponse);
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
index e53069a..9126b67 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
@@ -19,6 +19,7 @@
import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.extensions.ExtensionType;
+import org.apache.falcon.extensions.store.ExtensionStore;
import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.persistence.PersistenceConstants;
@@ -145,6 +146,11 @@
public void storeExtensionJob(String jobName, String extensionName, List<String> feeds, List<String> processes,
byte[] config) {
+ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+ boolean alreadySubmitted = false;
+ if (metaStore.getExtensionJobDetails(jobName) != null){
+ alreadySubmitted = true;
+ }
ExtensionJobsBean extensionJobsBean = new ExtensionJobsBean();
Date currentTime = new Date(System.currentTimeMillis());
extensionJobsBean.setJobName(jobName);
@@ -157,7 +163,11 @@
EntityManager entityManager = getEntityManager();
try {
beginTransaction(entityManager);
- entityManager.persist(extensionJobsBean);
+ if (alreadySubmitted) {
+ entityManager.merge(extensionJobsBean);
+ } else {
+ entityManager.persist(extensionJobsBean);
+ }
} finally {
commitAndCloseTransaction(entityManager);
}
diff --git a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java
index 1688abb..e3327e8 100644
--- a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java
+++ b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java
@@ -83,6 +83,8 @@
byte[] config = new byte[0];
stateStore.storeExtensionJob("job1", "test2", feeds, processes, config);
+ //storing again to check for entity manager merge to let submission go forward.
+ stateStore.storeExtensionJob("job1", "test2", feeds, processes, config);
Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 1);
Assert.assertEquals(stateStore.getExtensionJobDetails("job1").getFeeds().get(0), "testFeed");
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 81b0448..8daf8c7 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -128,12 +128,21 @@
}
protected Set<String> getColosFromExpression(String coloExpr, String type, String entity) {
- Set<String> colos;
final Set<String> applicableColos = getApplicableColos(type, entity);
+ return getColosFromExpression(coloExpr, applicableColos);
+ }
+
+ protected Set<String> getColosFromExpression(String coloExpr, String type, Entity entity) {
+ final Set<String> applicableColos = getApplicableColos(type, entity);
+ return getColosFromExpression(coloExpr, applicableColos);
+ }
+
+ private Set<String> getColosFromExpression(String coloExpr, Set<String> applicableColos) {
+ Set<String> colos;
if (coloExpr == null || coloExpr.equals("*") || coloExpr.isEmpty()) {
colos = applicableColos;
} else {
- colos = new HashSet<String>(Arrays.asList(coloExpr.split(",")));
+ colos = new HashSet<>(Arrays.asList(coloExpr.split(",")));
if (!applicableColos.containsAll(colos)) {
throw FalconWebException.newAPIException("Given colos not applicable for entity operation");
}
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
index 2b5cbe7..6f75dc7 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -191,21 +191,26 @@
@Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
public APIResult schedule(@PathParam("job-name") String jobName,
+ @Context HttpServletRequest request,
+ @QueryParam("colo") final String coloExpr,
@DefaultValue("") @QueryParam("doAs") String doAsUser) {
checkIfExtensionServiceIsEnabled();
checkIfExtensionIsEnabled(ExtensionStore.getMetaStore().getExtensionJobDetails(jobName).getExtensionName());
- try {
- List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
- if (entities.isEmpty()) {
- // return failure if the extension job doesn't exist
- return new APIResult(APIResult.Status.FAILED, "Extension job " + jobName + " doesn't exist.");
- }
+ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+ ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
+ if (extensionJobsBean == null) {
+ // return failure if the extension job doesn't exist
+ LOG.error("Extension Job not found:" + jobName);
+ throw FalconWebException.newAPIException("ExtensionJob not found:" + jobName,
+ Response.Status.NOT_FOUND);
+ }
- for (Entity entity : entities) {
- scheduleInternal(entity.getEntityType().name(), entity.getName(), null, null);
- }
- } catch (FalconException | IOException e) {
- LOG.error("Error when scheduling extension job: " + jobName + ": ", e);
+ SortedMap<EntityType, List<Entity>> entityMap;
+ try {
+ entityMap = getJobEntities(extensionJobsBean);
+ scheduleEntities(entityMap, request, coloExpr);
+ } catch (FalconException | IOException | JAXBException e) {
+ LOG.error("Error while scheduling entities of the extension: " + jobName + ": ", e);
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " scheduled successfully");
@@ -375,6 +380,7 @@
@Context HttpServletRequest request,
@DefaultValue("") @QueryParam("doAs") String doAsUser,
@QueryParam("jobName") String jobName,
+ @QueryParam("colo") final String coloExpr,
@FormDataParam("processes") List<FormDataBodyPart> processForms,
@FormDataParam("feeds") List<FormDataBodyPart> feedForms,
@FormDataParam("config") InputStream config) {
@@ -385,7 +391,7 @@
try {
entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config);
submitEntities(extensionName, jobName, entityMap, config, request);
- scheduleEntities(entityMap, request);
+ scheduleEntities(entityMap, request, coloExpr);
} catch (FalconException | IOException | JAXBException e) {
LOG.error("Error while submitting extension job: ", e);
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
@@ -393,13 +399,13 @@
return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully");
}
- protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap, HttpServletRequest request)
+ private void scheduleEntities(Map<EntityType, List<Entity>> entityMap, HttpServletRequest request, String coloExpr)
throws FalconException, JAXBException, IOException {
for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
for (final Entity entity : entry.getValue()) {
final HttpServletRequest httpServletRequest = getEntityStream(entity, entity.getEntityType(), request);
final HttpServletRequest bufferedRequest = getBufferedRequest(httpServletRequest);
- final Set<String> colos = getApplicableColos(entity.getEntityType().toString(), entity);
+ final Set<String> colos = getColosFromExpression(coloExpr, entity.getEntityType().name(), entity);
new EntityProxy(entity.getEntityType().toString(), entity.getName()) {
@Override
@@ -441,8 +447,9 @@
}
private void submitEntities(String extensionName, String jobName,
- SortedMap<EntityType, List<Entity>> entityMap, InputStream configStream,
- HttpServletRequest request) throws FalconException, IOException, JAXBException {
+ SortedMap<EntityType, List<Entity>> entityMap, InputStream configStream,
+ HttpServletRequest request)
+ throws FalconException, IOException, JAXBException {
List<Entity> feeds = entityMap.get(EntityType.FEED);
List<Entity> processes = entityMap.get(EntityType.PROCESS);
validateFeeds(feeds);
@@ -766,6 +773,7 @@
private static void checkIfExtensionServiceIsEnabled() {
if (!Services.get().isRegistered(ExtensionService.SERVICE_NAME)) {
+ LOG.error(ExtensionService.SERVICE_NAME + " is not enabled.");
throw FalconWebException.newAPIException(
ExtensionService.SERVICE_NAME + " is not enabled.", Response.Status.NOT_FOUND);
}
@@ -773,7 +781,8 @@
private static void checkIfExtensionIsEnabled(String extensionName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
- if (metaStore.getDetail(extensionName).getStatus().equals(ExtensionStatus.ENABLED)) {
+ if (!metaStore.getDetail(extensionName).getStatus().equals(ExtensionStatus.ENABLED)) {
+ LOG.error("Extension: " + extensionName + " is in disabled state.");
throw FalconWebException.newAPIException("Extension: " + extensionName + " is in disabled state.",
Response.Status.INTERNAL_SERVER_ERROR);
}
@@ -783,6 +792,7 @@
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
if (extensionJobsBean != null && !extensionJobsBean.getExtensionName().equals(extensionName)) {
+ LOG.error("Extension job with name: " + extensionName + " already exists.");
throw FalconWebException.newAPIException("Extension job with name: " + extensionName + " already exists.",
Response.Status.INTERNAL_SERVER_ERROR);
}
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 2a40611..6a65d2c 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -334,6 +334,15 @@
}
@Override
+ public APIResult scheduleExtensionJob(String jobName, String coloExpr, String doAsUser) {
+ try {
+ return localExtensionManager.scheduleExtensionJob(jobName, coloExpr, doAsUser);
+ } catch (FalconException | IOException e) {
+ throw new FalconCLIException("Failed to delete the extension job:" + coloExpr);
+ }
+ }
+
+ @Override
public APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser) {
InputStream configStream = getServletInputStream(configPath);
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index 1e9b15a..20ccfca 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -40,55 +40,75 @@
* A proxy implementation of the extension operations in local mode.
*/
public class LocalExtensionManager extends AbstractExtensionManager {
- public LocalExtensionManager() {}
+ LocalExtensionManager() {}
- public APIResult submitExtensionJob(String extensionName, String jobName, InputStream config,
- SortedMap<EntityType, List<Entity>> entityMap)
+ APIResult submitExtensionJob(String extensionName, String jobName, InputStream configStream,
+ SortedMap<EntityType, List<Entity>> entityMap)
throws FalconException, IOException {
-
- for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
- for(Entity entity : entry.getValue()){
+ for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+ for (Entity entity : entry.getValue()) {
submitInternal(entity, "falconUser");
}
}
+ storeExtension(extensionName, jobName, configStream, entityMap);
+
return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName);
}
- public APIResult submitAndSchedulableExtensionJob(String extensionName, String jobName, InputStream configStream,
- SortedMap<EntityType, List<Entity>> entityMap)
+ APIResult submitAndSchedulableExtensionJob(String extensionName, String jobName, InputStream configStream,
+ SortedMap<EntityType, List<Entity>> entityMap)
throws FalconException, IOException {
- List<String> feedNames = new ArrayList<>();
- List<String> processNames = new ArrayList<>();
for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
for(Entity entity : entry.getValue()){
submitInternal(entity, "falconUser");
}
}
- for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
- for(Entity entity : entry.getValue()){
+ storeExtension(extensionName, jobName, configStream, entityMap);
+
+ for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+ for (Entity entity : entry.getValue()) {
scheduleInternal(entry.getKey().name(), entity.getName(), null, null);
}
}
+ return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully"
+ + jobName);
+ }
+
+ private void storeExtension(String extensionName, String jobName, InputStream configStream, SortedMap<EntityType,
+ List<Entity>> entityMap) throws IOException {
byte[] configBytes = null;
if (configStream != null) {
configBytes = IOUtils.toByteArray(configStream);
}
- for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
- for(final Entity entity : entry.getValue()){
- if (entity.getEntityType().equals(EntityType.FEED)){
+ List<String> feedNames = new ArrayList<>();
+ List<String> processNames = new ArrayList<>();
+ for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+ for (final Entity entity : entry.getValue()) {
+ if (entity.getEntityType().equals(EntityType.FEED)) {
feedNames.add(entity.getName());
- }else{
+ } else {
processNames.add(entity.getName());
}
}
}
ExtensionStore.getMetaStore().storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes);
-
- return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName);
}
- public APIResult deleteExtensionJob(String jobName) throws FalconException, IOException{
+ APIResult scheduleExtensionJob(String jobName, String coloExpr, String doAsUser)
+ throws FalconException, IOException{
+ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+ ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
+ SortedMap<EntityType, List<Entity>> entityMap = getJobEntities(extensionJobsBean);
+ for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+ for (Entity entity : entry.getValue()) {
+ scheduleInternal(entity.getEntityType().name(), entity.getName(), true, null);
+ }
+ }
+ return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " scheduled successfully");
+ }
+
+ APIResult deleteExtensionJob(String jobName) throws FalconException, IOException {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
SortedMap<EntityType, List<Entity>> entityMap = getJobEntities(extensionJobsBean);
@@ -101,8 +121,8 @@
return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " deleted successfully");
}
- public APIResult updateExtensionJob(String extensionName, String jobName, InputStream configStream,
- SortedMap<EntityType, List<Entity>> entityMap)
+ APIResult updateExtensionJob(String extensionName, String jobName, InputStream configStream,
+ SortedMap<EntityType, List<Entity>> entityMap)
throws FalconException, IOException {
List<String> feedNames = new ArrayList<>();
List<String> processNames = new ArrayList<>();
@@ -128,27 +148,27 @@
return new APIResult(APIResult.Status.SUCCEEDED, "Updated successfully");
}
- public APIResult registerExtensionMetadata(String extensionName, String packagePath , String description) {
+ APIResult registerExtensionMetadata(String extensionName, String packagePath, String description) {
return super.registerExtensionMetadata(extensionName, packagePath, description, CurrentUser.getUser());
}
- public APIResult unRegisterExtension(String extensionName) {
+ APIResult unRegisterExtension(String extensionName) {
return super.deleteExtensionMetadata(extensionName);
}
- public APIResult getExtensionJobDetails(String jobName){
+ APIResult getExtensionJobDetails(String jobName){
return super.getExtensionJobDetail(jobName);
}
- public APIResult disableExtension(String extensionName) {
+ APIResult disableExtension(String extensionName) {
return new APIResult(APIResult.Status.SUCCEEDED, super.disableExtension(extensionName, CurrentUser.getUser()));
}
- public APIResult enableExtension(String extensionName) {
+ APIResult enableExtension(String extensionName) {
return new APIResult(APIResult.Status.SUCCEEDED, super.disableExtension(extensionName, CurrentUser.getUser()));
}
- public APIResult getExtensionDetails(String extensionName){
+ APIResult getExtensionDetails(String extensionName){
return super.getExtensionDetail(extensionName);
}
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index a41743d..508a7bb 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -444,7 +444,7 @@
createDir(PROCESS_APP_PATH);
copyExtensionJar(packageBuildLib);
- APIResult apiResult = submitAndScheduleExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
+ APIResult apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
assertStatus(apiResult);
result = getExtensionJobDetails(TEST_JOB);
JSONObject resultJson = new JSONObject(result);
@@ -452,6 +452,8 @@
Process process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
Assert.assertEquals(process.getPipelines(), "testPipeline");
+ apiResult = getClient().scheduleExtensionJob(TEST_JOB, null, null);
+ assertStatus(apiResult);
apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false);
assertStatus(apiResult);
Assert.assertEquals(apiResult.getMessage(), "RUNNING");
diff --git a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
index 2160320..3a6c9c0 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
@@ -17,39 +17,93 @@
*/
package org.apache.falcon.resource;
+import com.sun.jersey.multipart.FormDataBodyPart;
+import com.sun.jersey.multipart.FormDataParam;
import org.apache.falcon.FalconWebException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.POST;
import javax.ws.rs.Consumes;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
+import java.io.InputStream;
+import java.util.List;
/**
* This class provides RESTful API for the extensions.
*/
@Path("extension")
-public class ExtensionManager {
- public static final Logger LOG = LoggerFactory.getLogger(ExtensionManager.class);
+public class ExtensionManager extends AbstractExtensionManager {
+ private static final Logger LOG = LoggerFactory.getLogger(ExtensionManager.class);
@GET
@Path("enumerate")
@Produces({MediaType.APPLICATION_JSON})
- public Response getExtensions() {
+ public APIResult getExtensions() {
LOG.error("Enumerate is not supported on Server.Please run your operation on Prism ");
throw FalconWebException.newAPIException("Enumerate is not supported on Server. Please run your operation "
+ "on Prism.");
}
+ @POST
+ @Path("schedule/{job-name}")
+ @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+ public APIResult schedule(@PathParam("job-name") String jobName,
+ @Context HttpServletRequest request,
+ @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+ LOG.error("schedule is not supported on Server.Please run your operation on Prism ");
+ throw FalconWebException.newAPIException("schedule is not supported on Server. Please run your operation "
+ + "on Prism.");
+ }
+
+ @POST
+ @Path("submit/{extension-name}")
+ @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA,
+ MediaType.APPLICATION_OCTET_STREAM})
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+ public APIResult submit(
+ @PathParam("extension-name") String extensionName,
+ @Context HttpServletRequest request,
+ @DefaultValue("") @QueryParam("doAs") String doAsUser,
+ @QueryParam("jobName") String jobName,
+ @FormDataParam("processes") List<FormDataBodyPart> processForms,
+ @FormDataParam("feeds") List<FormDataBodyPart> feedForms,
+ @FormDataParam("config") InputStream config) {
+ LOG.error("submit is not supported on Server.Please run your operation on Prism ");
+ throw FalconWebException.newAPIException("submit is not supported on Server. Please run your operation "
+ + "on Prism.");
+ }
+
+ @POST
+ @Path("submitAndSchedule/{extension-name}")
+ @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA})
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+ public APIResult submitAndSchedule(
+ @PathParam("extension-name") String extensionName,
+ @Context HttpServletRequest request,
+ @DefaultValue("") @QueryParam("doAs") String doAsUser,
+ @QueryParam("jobName") String jobName,
+ @FormDataParam("processes") List<FormDataBodyPart> processForms,
+ @FormDataParam("feeds") List<FormDataBodyPart> feedForms,
+ @FormDataParam("config") InputStream config) {
+ LOG.error("submitAndSchedule is not supported on Server.Please run your operation on Prism ");
+ throw FalconWebException.newAPIException("submitAndSchedule is not supported on Server. Please run your "
+ + "operation on Prism.");
+ }
+
@GET
@Path("describe/{extension-name}")
@Produces(MediaType.TEXT_PLAIN)
- public String getExtensionDescription(
+ public APIResult getExtensionDescription(
@PathParam("extension-name") String extensionName) {
LOG.error("Describe is not supported on Server.Please run your operation on Prism ");
throw FalconWebException.newAPIException("Describe is not supported on Server. Please run your operation "
@@ -59,7 +113,7 @@
@GET
@Path("detail/{extension-name}")
@Produces({MediaType.APPLICATION_JSON})
- public Response getDetail(@PathParam("extension-name") String extensionName) {
+ public APIResult getDetail(@PathParam("extension-name") String extensionName) {
LOG.error("Detail is not supported on Server.Please run your operation on Prism ");
throw FalconWebException.newAPIException("Detail is not supported on Server. Please run your operation "
+ "on Prism.");
@@ -69,8 +123,8 @@
@Path("unregister/{extension-name}")
@Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Produces(MediaType.TEXT_PLAIN)
- public String deleteExtensionMetadata(
- @PathParam("extension-name") String extensionName){
+ public APIResult deleteExtensionMetadata(
+ @PathParam("extension-name") String extensionName) {
LOG.error("Unregister is not supported on Server.Please run your operation on Prism ");
throw FalconWebException.newAPIException("Unregister is not supported on Server. Please run your operation "
+ "on Prism.");
@@ -79,7 +133,7 @@
@GET
@Path("definition/{extension-name}")
@Produces({MediaType.APPLICATION_JSON})
- public String getExtensionDefinition(
+ public APIResult getExtensionDefinition(
@PathParam("extension-name") String extensionName) {
LOG.error("Definition is not supported on Server.Please run your operation on Prism ");
throw FalconWebException.newAPIException("Definition is not supported on Server. Please run your operation "