FALCON-2231 Changes to support Schedule of user extensions

Author: sandeep <sandysmdl@gmail.com>

Reviewers: @pallavi,@pracheer,@praveen

Closes #334 from sandeepSamudrala/FALCON-2231 and squashes the following commits:

d32bf98 [sandeep] FALCON-2231 Fixed checkstyle issues.
2bbd7e2 [sandeep] FALCON-2231 Incoporated review comments and fixed test cases
2269806 [sandeep] FALCON-2231 Incoporated review comments and small fixes for duplicate submission and colo addition to schedule command
44d6f2a [sandeep] FALCON-2231 Corrected message in LocalExtensionManager
f165282 [sandeep] FALCON-2231 Updated Error messages and throwing out exception in case of extension not found while scheduling
96a9a1d [sandeep] FALCON-2231 Rebased my patch
ca320e0 [sandeep] FACLON-2231 Changes to support Schedule of user extensions
53831ea [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2231
cc28658 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
089b10d [sandeep] Merge branch 'master' of https://github.com/apache/falcon
456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon
b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon
194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon
e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
index a8aea52..dcac8e8 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
@@ -369,7 +369,7 @@
         OUT.get().println(result);
     }
 
-    private void validateColo(Set<String> optionsList) {
+    static void validateColo(Set<String> optionsList) {
         if (optionsList.contains(FalconCLIConstants.COLO_OPT)) {
             throw new FalconCLIException("Invalid argument : " + FalconCLIConstants.COLO_OPT);
         }
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
index 0343aa8..2a105dc 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
@@ -33,38 +33,44 @@
 import org.apache.falcon.resource.ExtensionInstanceList;
 import org.apache.falcon.resource.ExtensionJobList;
 
+import java.io.IOException;
 import java.io.PrintStream;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.falcon.cli.FalconEntityCLI.validateColo;
+import static org.apache.falcon.client.FalconCLIConstants.COLO_OPT;
+import static org.apache.falcon.client.FalconCLIConstants.COLO_OPT_DESCRIPTION;
+
 /**
  * Falcon extensions Command Line Interface - wraps the RESTful API for extensions.
  */
-public class FalconExtensionCLI {
+public class FalconExtensionCLI extends FalconCLI{
     public static final AtomicReference<PrintStream> OUT = new AtomicReference<>(System.out);
 
     // Extension commands
-    public static final String ENUMERATE_OPT = "enumerate";
-    public static final String DEFINITION_OPT = "definition";
-    public static final String DESCRIBE_OPT = "describe";
-    public static final String INSTANCES_OPT = "instances";
-    public static final String UNREGISTER_OPT = "unregister";
-    public static final String DETAIL_OPT = "detail";
-    public static final String REGISTER_OPT = "register";
-    public static final String ENABLE_OPT = "enable";
-    public static final String DISABLE_OPT = "disable";
+    private static final String ENUMERATE_OPT = "enumerate";
+    private static final String DEFINITION_OPT = "definition";
+    private static final String DESCRIBE_OPT = "describe";
+    private static final String INSTANCES_OPT = "instances";
+    private static final String UNREGISTER_OPT = "unregister";
+    private static final String DETAIL_OPT = "detail";
+    private static final String REGISTER_OPT = "register";
+    private static final String ENABLE_OPT = "enable";
+    private static final String DISABLE_OPT = "disable";
 
     // Input parameters
-    public static final String EXTENSION_NAME_OPT = "extensionName";
-    public static final String JOB_NAME_OPT = "jobName";
+    private static final String EXTENSION_NAME_OPT = "extensionName";
+    private static final String JOB_NAME_OPT = "jobName";
     public static final String DESCRIPTION = "description";
-    public static final String PATH = "path";
+    private static final String PATH = "path";
 
-    public FalconExtensionCLI() {
+    FalconExtensionCLI() throws Exception {
+        super();
     }
 
-    public void extensionCommand(CommandLine commandLine, FalconClient client) {
+    void extensionCommand(CommandLine commandLine, FalconClient client) throws IOException {
         Set<String> optionsList = new HashSet<>();
         for (Option option : commandLine.getOptions()) {
             optionsList.add(option.getOpt());
@@ -77,6 +83,8 @@
         String doAsUser = commandLine.getOptionValue(FalconCLIConstants.DO_AS_OPT);
         String path = commandLine.getOptionValue(FalconCLIConstants.PATH);
         String description = commandLine.getOptionValue(FalconCLIConstants.DESCRIPTION);
+        String colo = commandLine.getOptionValue(FalconCLIConstants.COLO_OPT);
+        colo = getColo(colo);
 
         if (optionsList.contains(ENUMERATE_OPT)) {
             result = client.enumerateExtensions().getMessage();
@@ -105,6 +113,7 @@
             validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             validateRequiredParameter(jobName, JOB_NAME_OPT);
             validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
+            validateColo(optionsList);
             result = client.submitExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
         } else if (optionsList.contains(REGISTER_OPT)) {
             validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
@@ -114,6 +123,7 @@
             validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             validateRequiredParameter(jobName, JOB_NAME_OPT);
             validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
+            validateColo(optionsList);
             result = client.submitAndScheduleExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
         } else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) {
             validateRequiredParameter(jobName, JOB_NAME_OPT);
@@ -125,7 +135,8 @@
             result = client.validateExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
         } else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) {
             validateRequiredParameter(jobName, JOB_NAME_OPT);
-            result = client.scheduleExtensionJob(jobName, doAsUser).getMessage();
+            colo = getColo(colo);
+            result = client.scheduleExtensionJob(jobName, colo, doAsUser).getMessage();
         } else if (optionsList.contains(FalconCLIConstants.SUSPEND_OPT)) {
             validateRequiredParameter(jobName, JOB_NAME_OPT);
             result = client.suspendExtensionJob(jobName, doAsUser).getMessage();
@@ -170,7 +181,7 @@
         OUT.get().println(result);
     }
 
-    public Options createExtensionOptions() {
+    Options createExtensionOptions() {
         Options extensionOptions = new Options();
 
         Option enumerate = new Option(ENUMERATE_OPT, false, "Enumerate all extensions");
@@ -192,6 +203,8 @@
         Option detail = new Option(FalconCLIConstants.DETAIL, false, "Show details of a given extension");
         Option register = new Option(FalconCLIConstants.REGISTER, false, "Register an extension with Falcon. This will "
                 + "make the extension available for instantiation for all users.");
+        Option colo = new Option(COLO_OPT, true, COLO_OPT_DESCRIPTION);
+        colo.setRequired(false);
 
         OptionGroup group = new OptionGroup();
         group.addOption(enumerate);
@@ -249,6 +262,7 @@
         extensionOptions.addOption(filePath);
         extensionOptions.addOption(path);
         extensionOptions.addOption(description);
+        extensionOptions.addOption(colo);
 
         return extensionOptions;
     }
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index 3181b64..7b8a606 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -222,6 +222,13 @@
                                                 String doAsUser);
 
     /**
+     * Schedules the set of entities that are part of the extension.
+     * @param jobName extensionJob that needs to be scheduled.
+     * @return APIResult stating status of scheduling the extension.
+     */
+    public abstract APIResult scheduleExtensionJob(String jobName, String coloExpr, String doAsUser);
+
+    /**
      * Prepares set of entities the extension has implemented and stage them to a local directory and submits and
      * schedules them.
      * @param extensionName extension which is available in the store.
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 8401c9c..2772085 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -1200,9 +1200,10 @@
         }
     }
 
-    public APIResult scheduleExtensionJob(final String jobName, final String doAsUser)  {
+    public APIResult scheduleExtensionJob(String jobName, final String coloExpr, final String doAsUser)  {
         ClientResponse clientResponse = new ResourceBuilder()
                 .path(ExtensionOperations.SCHEDULE.path, jobName)
+                .addQueryParam(COLO, coloExpr)
                 .addQueryParam(DO_AS_OPT, doAsUser)
                 .call(ExtensionOperations.SCHEDULE);
         return getResponse(APIResult.class, clientResponse);
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
index e53069a..9126b67 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
@@ -19,6 +19,7 @@
 
 import org.apache.falcon.extensions.ExtensionStatus;
 import org.apache.falcon.extensions.ExtensionType;
+import org.apache.falcon.extensions.store.ExtensionStore;
 import org.apache.falcon.persistence.ExtensionBean;
 import org.apache.falcon.persistence.ExtensionJobsBean;
 import org.apache.falcon.persistence.PersistenceConstants;
@@ -145,6 +146,11 @@
 
     public void storeExtensionJob(String jobName, String extensionName, List<String> feeds, List<String> processes,
                                   byte[] config) {
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+        boolean alreadySubmitted = false;
+        if (metaStore.getExtensionJobDetails(jobName) != null){
+            alreadySubmitted = true;
+        }
         ExtensionJobsBean extensionJobsBean = new ExtensionJobsBean();
         Date currentTime = new Date(System.currentTimeMillis());
         extensionJobsBean.setJobName(jobName);
@@ -157,7 +163,11 @@
         EntityManager entityManager = getEntityManager();
         try {
             beginTransaction(entityManager);
-            entityManager.persist(extensionJobsBean);
+            if (alreadySubmitted) {
+                entityManager.merge(extensionJobsBean);
+            } else {
+                entityManager.persist(extensionJobsBean);
+            }
         } finally {
             commitAndCloseTransaction(entityManager);
         }
diff --git a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java
index 1688abb..e3327e8 100644
--- a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java
+++ b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java
@@ -83,6 +83,8 @@
 
         byte[] config = new byte[0];
         stateStore.storeExtensionJob("job1", "test2", feeds, processes, config);
+        //storing again to check for entity manager merge to let submission go forward.
+        stateStore.storeExtensionJob("job1", "test2", feeds, processes, config);
 
         Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 1);
         Assert.assertEquals(stateStore.getExtensionJobDetails("job1").getFeeds().get(0), "testFeed");
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 81b0448..8daf8c7 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -128,12 +128,21 @@
     }
 
     protected Set<String> getColosFromExpression(String coloExpr, String type, String entity) {
-        Set<String> colos;
         final Set<String> applicableColos = getApplicableColos(type, entity);
+        return getColosFromExpression(coloExpr, applicableColos);
+    }
+
+    protected Set<String> getColosFromExpression(String coloExpr, String type, Entity entity) {
+        final Set<String> applicableColos = getApplicableColos(type, entity);
+        return getColosFromExpression(coloExpr, applicableColos);
+    }
+
+    private Set<String> getColosFromExpression(String coloExpr, Set<String> applicableColos) {
+        Set<String> colos;
         if (coloExpr == null || coloExpr.equals("*") || coloExpr.isEmpty()) {
             colos = applicableColos;
         } else {
-            colos = new HashSet<String>(Arrays.asList(coloExpr.split(",")));
+            colos = new HashSet<>(Arrays.asList(coloExpr.split(",")));
             if (!applicableColos.containsAll(colos)) {
                 throw FalconWebException.newAPIException("Given colos not applicable for entity operation");
             }
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
index 2b5cbe7..6f75dc7 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -191,21 +191,26 @@
     @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
     @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
     public APIResult schedule(@PathParam("job-name") String jobName,
+                              @Context HttpServletRequest request,
+                              @QueryParam("colo") final String coloExpr,
                               @DefaultValue("") @QueryParam("doAs") String doAsUser) {
         checkIfExtensionServiceIsEnabled();
         checkIfExtensionIsEnabled(ExtensionStore.getMetaStore().getExtensionJobDetails(jobName).getExtensionName());
-        try {
-            List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
-            if (entities.isEmpty()) {
-                // return failure if the extension job doesn't exist
-                return new APIResult(APIResult.Status.FAILED, "Extension job " + jobName + " doesn't exist.");
-            }
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+        ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
+        if (extensionJobsBean == null) {
+            // return failure if the extension job doesn't exist
+            LOG.error("Extension Job not found:" + jobName);
+            throw FalconWebException.newAPIException("ExtensionJob not found:" + jobName,
+                    Response.Status.NOT_FOUND);
+        }
 
-            for (Entity entity : entities) {
-                scheduleInternal(entity.getEntityType().name(), entity.getName(), null, null);
-            }
-        } catch (FalconException | IOException e) {
-            LOG.error("Error when scheduling extension job: " + jobName + ": ", e);
+        SortedMap<EntityType, List<Entity>> entityMap;
+        try {
+            entityMap = getJobEntities(extensionJobsBean);
+            scheduleEntities(entityMap, request, coloExpr);
+        } catch (FalconException | IOException | JAXBException e) {
+            LOG.error("Error while scheduling entities of the extension: " + jobName + ": ", e);
             throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
         }
         return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " scheduled successfully");
@@ -375,6 +380,7 @@
             @Context HttpServletRequest request,
             @DefaultValue("") @QueryParam("doAs") String doAsUser,
             @QueryParam("jobName") String jobName,
+            @QueryParam("colo") final String coloExpr,
             @FormDataParam("processes") List<FormDataBodyPart> processForms,
             @FormDataParam("feeds") List<FormDataBodyPart> feedForms,
             @FormDataParam("config") InputStream config) {
@@ -385,7 +391,7 @@
         try {
             entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config);
             submitEntities(extensionName, jobName, entityMap, config, request);
-            scheduleEntities(entityMap, request);
+            scheduleEntities(entityMap, request, coloExpr);
         } catch (FalconException | IOException | JAXBException e) {
             LOG.error("Error while submitting extension job: ", e);
             throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
@@ -393,13 +399,13 @@
         return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully");
     }
 
-    protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap, HttpServletRequest request)
+    private void scheduleEntities(Map<EntityType, List<Entity>> entityMap, HttpServletRequest request, String coloExpr)
         throws FalconException, JAXBException, IOException {
         for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
             for (final Entity entity : entry.getValue()) {
                 final HttpServletRequest httpServletRequest = getEntityStream(entity, entity.getEntityType(), request);
                 final HttpServletRequest bufferedRequest = getBufferedRequest(httpServletRequest);
-                final Set<String> colos = getApplicableColos(entity.getEntityType().toString(), entity);
+                final Set<String> colos = getColosFromExpression(coloExpr, entity.getEntityType().name(), entity);
 
                 new EntityProxy(entity.getEntityType().toString(), entity.getName()) {
                     @Override
@@ -441,8 +447,9 @@
     }
 
     private void submitEntities(String extensionName, String jobName,
-                                  SortedMap<EntityType, List<Entity>> entityMap, InputStream configStream,
-                                  HttpServletRequest request) throws FalconException, IOException, JAXBException {
+                                SortedMap<EntityType, List<Entity>> entityMap, InputStream configStream,
+                                HttpServletRequest request)
+        throws FalconException, IOException, JAXBException {
         List<Entity> feeds = entityMap.get(EntityType.FEED);
         List<Entity> processes = entityMap.get(EntityType.PROCESS);
         validateFeeds(feeds);
@@ -766,6 +773,7 @@
 
     private static void checkIfExtensionServiceIsEnabled() {
         if (!Services.get().isRegistered(ExtensionService.SERVICE_NAME)) {
+            LOG.error(ExtensionService.SERVICE_NAME + " is not enabled.");
             throw FalconWebException.newAPIException(
                     ExtensionService.SERVICE_NAME + " is not enabled.", Response.Status.NOT_FOUND);
         }
@@ -773,7 +781,8 @@
 
     private static void checkIfExtensionIsEnabled(String extensionName) {
         ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
-        if (metaStore.getDetail(extensionName).getStatus().equals(ExtensionStatus.ENABLED)) {
+        if (!metaStore.getDetail(extensionName).getStatus().equals(ExtensionStatus.ENABLED)) {
+            LOG.error("Extension: " + extensionName + " is in disabled state.");
             throw FalconWebException.newAPIException("Extension: " + extensionName + " is in disabled state.",
                     Response.Status.INTERNAL_SERVER_ERROR);
         }
@@ -783,6 +792,7 @@
         ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
         ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
         if (extensionJobsBean != null && !extensionJobsBean.getExtensionName().equals(extensionName)) {
+            LOG.error("Extension job with name: " + extensionName + " already exists.");
             throw FalconWebException.newAPIException("Extension job with name: " + extensionName + " already exists.",
                     Response.Status.INTERNAL_SERVER_ERROR);
         }
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 2a40611..6a65d2c 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -334,6 +334,15 @@
     }
 
     @Override
+    public APIResult scheduleExtensionJob(String jobName, String coloExpr, String doAsUser) {
+        try {
+            return localExtensionManager.scheduleExtensionJob(jobName, coloExpr, doAsUser);
+        } catch (FalconException | IOException e) {
+            throw new FalconCLIException("Failed to delete the extension job:" + coloExpr);
+        }
+    }
+
+    @Override
     public APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath,
                                                    String doAsUser) {
         InputStream configStream = getServletInputStream(configPath);
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index 1e9b15a..20ccfca 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -40,55 +40,75 @@
  * A proxy implementation of the extension operations in local mode.
  */
 public class LocalExtensionManager extends AbstractExtensionManager {
-    public LocalExtensionManager() {}
+    LocalExtensionManager() {}
 
-    public APIResult submitExtensionJob(String extensionName, String jobName, InputStream config,
-                                        SortedMap<EntityType, List<Entity>> entityMap)
+    APIResult submitExtensionJob(String extensionName, String jobName, InputStream configStream,
+                                 SortedMap<EntityType, List<Entity>> entityMap)
         throws FalconException, IOException {
-
-        for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
-            for(Entity entity : entry.getValue()){
+        for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+            for (Entity entity : entry.getValue()) {
                 submitInternal(entity, "falconUser");
             }
         }
+        storeExtension(extensionName, jobName, configStream, entityMap);
+
         return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName);
     }
 
-    public APIResult submitAndSchedulableExtensionJob(String extensionName, String jobName, InputStream configStream,
-                                                      SortedMap<EntityType, List<Entity>> entityMap)
+    APIResult submitAndSchedulableExtensionJob(String extensionName, String jobName, InputStream configStream,
+                                               SortedMap<EntityType, List<Entity>> entityMap)
         throws FalconException, IOException {
-        List<String> feedNames = new ArrayList<>();
-        List<String> processNames = new ArrayList<>();
         for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
             for(Entity entity : entry.getValue()){
                 submitInternal(entity, "falconUser");
             }
         }
 
-        for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
-            for(Entity entity : entry.getValue()){
+        storeExtension(extensionName, jobName, configStream, entityMap);
+
+        for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+            for (Entity entity : entry.getValue()) {
                 scheduleInternal(entry.getKey().name(), entity.getName(), null, null);
             }
         }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully"
+                + jobName);
+    }
+
+    private void storeExtension(String extensionName, String jobName, InputStream configStream, SortedMap<EntityType,
+            List<Entity>> entityMap) throws IOException {
         byte[] configBytes = null;
         if (configStream != null) {
             configBytes = IOUtils.toByteArray(configStream);
         }
-        for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
-            for(final Entity entity : entry.getValue()){
-                if (entity.getEntityType().equals(EntityType.FEED)){
+        List<String> feedNames = new ArrayList<>();
+        List<String> processNames = new ArrayList<>();
+        for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+            for (final Entity entity : entry.getValue()) {
+                if (entity.getEntityType().equals(EntityType.FEED)) {
                     feedNames.add(entity.getName());
-                }else{
+                } else {
                     processNames.add(entity.getName());
                 }
             }
         }
         ExtensionStore.getMetaStore().storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes);
-
-        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName);
     }
 
-    public APIResult deleteExtensionJob(String jobName) throws FalconException, IOException{
+    APIResult scheduleExtensionJob(String jobName, String coloExpr, String doAsUser)
+        throws FalconException, IOException{
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+        ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
+        SortedMap<EntityType, List<Entity>> entityMap = getJobEntities(extensionJobsBean);
+        for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+            for (Entity entity : entry.getValue()) {
+                scheduleInternal(entity.getEntityType().name(), entity.getName(), true, null);
+            }
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " scheduled successfully");
+    }
+
+    APIResult deleteExtensionJob(String jobName) throws FalconException, IOException {
         ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
         ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
         SortedMap<EntityType, List<Entity>> entityMap = getJobEntities(extensionJobsBean);
@@ -101,8 +121,8 @@
         return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " deleted successfully");
     }
 
-    public APIResult updateExtensionJob(String extensionName, String jobName, InputStream configStream,
-                                        SortedMap<EntityType, List<Entity>> entityMap)
+    APIResult updateExtensionJob(String extensionName, String jobName, InputStream configStream,
+                                 SortedMap<EntityType, List<Entity>> entityMap)
         throws FalconException, IOException {
         List<String> feedNames = new ArrayList<>();
         List<String> processNames = new ArrayList<>();
@@ -128,27 +148,27 @@
         return new APIResult(APIResult.Status.SUCCEEDED, "Updated successfully");
     }
 
-    public APIResult registerExtensionMetadata(String extensionName, String packagePath , String description) {
+    APIResult registerExtensionMetadata(String extensionName, String packagePath, String description) {
         return super.registerExtensionMetadata(extensionName, packagePath, description, CurrentUser.getUser());
     }
 
-    public APIResult unRegisterExtension(String extensionName) {
+    APIResult unRegisterExtension(String extensionName) {
         return super.deleteExtensionMetadata(extensionName);
     }
 
-    public APIResult getExtensionJobDetails(String jobName){
+    APIResult getExtensionJobDetails(String jobName){
         return super.getExtensionJobDetail(jobName);
     }
 
-    public APIResult disableExtension(String extensionName) {
+    APIResult disableExtension(String extensionName) {
         return new APIResult(APIResult.Status.SUCCEEDED, super.disableExtension(extensionName, CurrentUser.getUser()));
     }
 
-    public APIResult enableExtension(String extensionName) {
+    APIResult enableExtension(String extensionName) {
         return new APIResult(APIResult.Status.SUCCEEDED, super.disableExtension(extensionName, CurrentUser.getUser()));
     }
 
-    public APIResult getExtensionDetails(String extensionName){
+    APIResult getExtensionDetails(String extensionName){
         return super.getExtensionDetail(extensionName);
     }
 
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index a41743d..508a7bb 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -444,7 +444,7 @@
 
         createDir(PROCESS_APP_PATH);
         copyExtensionJar(packageBuildLib);
-        APIResult apiResult = submitAndScheduleExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
+        APIResult apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
         assertStatus(apiResult);
         result = getExtensionJobDetails(TEST_JOB);
         JSONObject resultJson = new JSONObject(result);
@@ -452,6 +452,8 @@
         Process process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
         Assert.assertEquals(process.getPipelines(), "testPipeline");
 
+        apiResult = getClient().scheduleExtensionJob(TEST_JOB, null, null);
+        assertStatus(apiResult);
         apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false);
         assertStatus(apiResult);
         Assert.assertEquals(apiResult.getMessage(), "RUNNING");
diff --git a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
index 2160320..3a6c9c0 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
@@ -17,39 +17,93 @@
  */
 package org.apache.falcon.resource;
 
+import com.sun.jersey.multipart.FormDataBodyPart;
+import com.sun.jersey.multipart.FormDataParam;
 import org.apache.falcon.FalconWebException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.POST;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
+import java.io.InputStream;
+import java.util.List;
 
 /**
  * This class provides RESTful API for the extensions.
  */
 @Path("extension")
-public class ExtensionManager {
-    public static final Logger LOG = LoggerFactory.getLogger(ExtensionManager.class);
+public class ExtensionManager extends AbstractExtensionManager {
+    private static final Logger LOG = LoggerFactory.getLogger(ExtensionManager.class);
 
     @GET
     @Path("enumerate")
     @Produces({MediaType.APPLICATION_JSON})
-    public Response getExtensions() {
+    public APIResult getExtensions() {
         LOG.error("Enumerate is not supported on Server.Please run your operation on Prism ");
         throw FalconWebException.newAPIException("Enumerate is not supported on Server. Please run your operation "
                 + "on Prism.");
     }
 
+    @POST
+    @Path("schedule/{job-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+    public APIResult schedule(@PathParam("job-name") String jobName,
+                              @Context HttpServletRequest request,
+                              @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+        LOG.error("schedule is not supported on Server.Please run your operation on Prism ");
+        throw FalconWebException.newAPIException("schedule is not supported on Server. Please run your operation "
+                + "on Prism.");
+    }
+
+    @POST
+    @Path("submit/{extension-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA,
+            MediaType.APPLICATION_OCTET_STREAM})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+    public APIResult submit(
+            @PathParam("extension-name") String extensionName,
+            @Context HttpServletRequest request,
+            @DefaultValue("") @QueryParam("doAs") String doAsUser,
+            @QueryParam("jobName") String jobName,
+            @FormDataParam("processes") List<FormDataBodyPart> processForms,
+            @FormDataParam("feeds") List<FormDataBodyPart> feedForms,
+            @FormDataParam("config") InputStream config) {
+        LOG.error("submit is not supported on Server.Please run your operation on Prism ");
+        throw FalconWebException.newAPIException("submit is not supported on Server. Please run your operation "
+                + "on Prism.");
+    }
+
+    @POST
+    @Path("submitAndSchedule/{extension-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+    public APIResult submitAndSchedule(
+            @PathParam("extension-name") String extensionName,
+            @Context HttpServletRequest request,
+            @DefaultValue("") @QueryParam("doAs") String doAsUser,
+            @QueryParam("jobName") String jobName,
+            @FormDataParam("processes") List<FormDataBodyPart> processForms,
+            @FormDataParam("feeds") List<FormDataBodyPart> feedForms,
+            @FormDataParam("config") InputStream config) {
+        LOG.error("submitAndSchedule is not supported on Server.Please run your operation on Prism ");
+        throw FalconWebException.newAPIException("submitAndSchedule is not supported on Server. Please run your "
+                + "operation on Prism.");
+    }
+
     @GET
     @Path("describe/{extension-name}")
     @Produces(MediaType.TEXT_PLAIN)
-    public String getExtensionDescription(
+    public APIResult getExtensionDescription(
             @PathParam("extension-name") String extensionName) {
         LOG.error("Describe is not supported on Server.Please run your operation on Prism ");
         throw FalconWebException.newAPIException("Describe is not supported on Server. Please run your operation "
@@ -59,7 +113,7 @@
     @GET
     @Path("detail/{extension-name}")
     @Produces({MediaType.APPLICATION_JSON})
-    public Response getDetail(@PathParam("extension-name") String extensionName) {
+    public APIResult getDetail(@PathParam("extension-name") String extensionName) {
         LOG.error("Detail is not supported on Server.Please run your operation on Prism ");
         throw FalconWebException.newAPIException("Detail is not supported on Server. Please run your operation "
                 + "on Prism.");
@@ -69,8 +123,8 @@
     @Path("unregister/{extension-name}")
     @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
     @Produces(MediaType.TEXT_PLAIN)
-    public String deleteExtensionMetadata(
-            @PathParam("extension-name") String extensionName){
+    public APIResult deleteExtensionMetadata(
+            @PathParam("extension-name") String extensionName) {
         LOG.error("Unregister is not supported on Server.Please run your operation on Prism ");
         throw FalconWebException.newAPIException("Unregister is not supported on Server. Please run your operation "
                 + "on Prism.");
@@ -79,7 +133,7 @@
     @GET
     @Path("definition/{extension-name}")
     @Produces({MediaType.APPLICATION_JSON})
-    public String getExtensionDefinition(
+    public APIResult getExtensionDefinition(
             @PathParam("extension-name") String extensionName) {
         LOG.error("Definition is not supported on Server.Please run your operation on Prism ");
         throw FalconWebException.newAPIException("Definition is not supported on Server. Please run your operation "