FALCON-2200 Update API support for extension job (user extension)

Author: sandeep <sandysmdl@gmail.com>

Reviewers: @pallavi-rao

Closes #331 from sandeepSamudrala/FALCON-2200 and squashes the following commits:

737fad3 [sandeep] FALCON-2200 fixed checkstyle issues. removed unused imports
1780416 [sandeep] Incorporated review comments. Removed entitychannel and config channel from ExtensionManager Proxy as they are now used from proxyUtil
8a4d035 [sandeep] FALCON-2200 Incorporated review comments. Moved common code from proxies to proxyutil and making 2 api calls to get location in case of update extension
c8d0ab7 [sandeep] FALCON-2200 Adding changes related to clusters being removed and clusters being added into entity definition
cc7c9e9 [sandeep] FALCON-2200 Update API support for extension job (user extension)
456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon
b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon
194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon
e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
index 59538bc..aa436da 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
@@ -110,12 +110,13 @@
             result = client.registerExtension(extensionName, path, description).getMessage();
         } else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) {
             validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
+            validateRequiredParameter(jobName, JOB_NAME_OPT);
             validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
             result = client.submitAndScheduleExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
         } else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) {
-            validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
+            validateRequiredParameter(jobName, JOB_NAME_OPT);
             validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
-            result = client.updateExtensionJob(extensionName, filePath, doAsUser).getMessage();
+            result = client.updateExtensionJob(jobName, filePath, doAsUser).getMessage();
         } else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) {
             validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
diff --git a/client/src/main/java/org/apache/falcon/ExtensionHandler.java b/client/src/main/java/org/apache/falcon/ExtensionHandler.java
index 11122e2..5e2f26d 100644
--- a/client/src/main/java/org/apache/falcon/ExtensionHandler.java
+++ b/client/src/main/java/org/apache/falcon/ExtensionHandler.java
@@ -54,6 +54,7 @@
     private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir"));
     private static final String LOCATION = "location";
     private static final String TYPE = "type";
+    private static final String NAME = "name";
     private static final String EXTENSION_BUILDER_INTERFACE_SERVICE_FILE =
             "META-INF/services/org.apache.falcon.extensions.ExtensionBuilder";
 
@@ -220,4 +221,14 @@
         }
         return extensionType;
     }
+
+    public static  String getExtensionName(String jobName, JSONObject extensionJobDetailJson) {
+        String extensionType;
+        try {
+            extensionType = extensionJobDetailJson.get(NAME).toString();
+        } catch (JSONException e) {
+            throw new FalconCLIException("Failed to get extension name for the given extension job:" + jobName, e);
+        }
+        return extensionType;
+    }
 }
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index 879d794..8cdbf30 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -222,6 +222,16 @@
                                                  String doAsUser);
 
     /**
+     * Prepares set of entities the extension has implemented and stage them to a local directory and updates them.
+     * @param jobName name to be used in all the extension entities' tagging that are built as part of
+     *                           loadAndPrepare.
+     * @param configPath path to extension parameters.
+     * @return
+     * @throws FalconCLIException
+     */
+    public abstract APIResult updateExtensionJob(String jobName, String configPath, String doAsUser);
+
+    /**
      *  Prepares set of entities the extension has implemented to validate the extension job.
      * @param jobName job name of the extension job.
      * @return
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index e03e82d..9adb142 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -1028,9 +1028,12 @@
     }
 
     public APIResult getExtensionJobDetails(final String jobName) {
-        ClientResponse clientResponse = new ResourceBuilder().path(ExtensionOperations.JOB_DETAILS.path, jobName)
+        return getResponse(APIResult.class, getExtensionJobDetailsResponse(jobName));
+    }
+
+    private ClientResponse getExtensionJobDetailsResponse(final String jobName) {
+        return new ResourceBuilder().path(ExtensionOperations.JOB_DETAILS.path, jobName)
                 .call(ExtensionOperations.JOB_DETAILS);
-        return getResponse(APIResult.class, clientResponse);
     }
 
     private ClientResponse getExtensionDetailResponse(final String extensionName) {
@@ -1097,7 +1100,11 @@
 
     private List<Entity> validateExtensionAndGetEntities(String extensionName, String jobName,
                                                          InputStream configStream) {
-        JSONObject extensionDetailJson = getExtensionDetailJson(extensionName);
+        JSONObject extensionDetailJson;
+        if (StringUtils.isBlank(extensionName)) {
+            extensionName = ExtensionHandler.getExtensionName(jobName, getExtensionJobDetailJson(jobName));
+        }
+        extensionDetailJson = getExtensionDetailJson(extensionName);
         String extensionType = ExtensionHandler.getExtensionType(extensionName, extensionDetailJson);
         String extensionBuildLocation = ExtensionHandler.getExtensionLocation(extensionName, extensionDetailJson);
         return getEntities(extensionName, jobName, configStream, extensionType,
@@ -1115,6 +1122,16 @@
         }
         return extensionDetailJson;
     }
+    private JSONObject getExtensionJobDetailJson(String jobName) {
+        ClientResponse clientResponse = getExtensionJobDetailsResponse(jobName);
+        JSONObject extensionJobDetailJson;
+        try {
+            extensionJobDetailJson = new JSONObject(getResponse(APIResult.class, clientResponse).getMessage());
+        } catch (JSONException e) {
+            throw new FalconCLIException("Failed to get details for the given extension", e);
+        }
+        return extensionJobDetailJson;
+    }
 
     private List<Entity> getEntities(String extensionName, String jobName, InputStream configStream,
                                      String extensionType, String extensionBuildLocation) {
@@ -1144,12 +1161,12 @@
         return getResponse(APIResult.class, clientResponse);
     }
 
-    public APIResult updateExtensionJob(final String extensionName, final String filePath, final String doAsUser) {
-        InputStream entityStream = getServletInputStream(filePath);
+    public APIResult updateExtensionJob(final String jobName, final String configPath, final String doAsUser) {
+        FormDataMultiPart entitiesForm = getEntitiesForm(null, jobName, configPath);
         ClientResponse clientResponse = new ResourceBuilder()
-                .path(ExtensionOperations.UPDATE.path, extensionName)
+                .path(ExtensionOperations.UPDATE.path, jobName)
                 .addQueryParam(DO_AS_OPT, doAsUser)
-                .call(ExtensionOperations.UPDATE, entityStream);
+                .call(ExtensionOperations.UPDATE, entitiesForm);
         return getResponse(APIResult.class, clientResponse);
     }
 
diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
index 6ad887e..cb8c816 100644
--- a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
+++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -272,12 +272,12 @@
             "create table FALCON_DB_PROPS (name varchar(100), data varchar(100))";
 
     private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception {
-        String insertDbVerion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";
+        String insertDbVersion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";
 
         PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
         writer.println();
         writer.println(CREATE_FALCON_DB_PROPS);
-        writer.println(insertDbVerion);
+        writer.println(insertDbVersion);
         writer.close();
         System.out.println("Create FALCON_DB_PROPS table");
         if (run) {
@@ -287,7 +287,7 @@
                 conn.setAutoCommit(true);
                 st = conn.createStatement();
                 st.executeUpdate(CREATE_FALCON_DB_PROPS);
-                st.executeUpdate(insertDbVerion);
+                st.executeUpdate(insertDbVersion);
                 st.close();
             } catch (Exception ex) {
                 closeStatement(st);
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
index 277cb95..03f98f6 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
@@ -172,6 +172,23 @@
         }
     }
 
+    public void updateExtensionJob(String jobName, String extensionName, List<String> feedNames,
+                                   List<String> processNames, byte[] configBytes) {
+        EntityManager entityManager = getEntityManager();
+        ExtensionJobsBean extensionJobsBean = new ExtensionJobsBean();
+        extensionJobsBean.setJobName(jobName);
+        extensionJobsBean.setExtensionName(extensionName);
+        extensionJobsBean.setFeeds(feedNames);
+        extensionJobsBean.setProcesses(processNames);
+        extensionJobsBean.setConfig(configBytes);
+        try {
+            beginTransaction(entityManager);
+            entityManager.merge(extensionJobsBean);
+        } finally {
+            commitAndCloseTransaction(entityManager);
+        }
+    }
+
     public ExtensionJobsBean getExtensionJobDetails(String jobName) {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
diff --git a/extensions/src/test/java/org/apache/falcon/ExtensionExample.java b/extensions/src/test/java/org/apache/falcon/ExtensionExample.java
index 432e37b..d3de2e6 100644
--- a/extensions/src/test/java/org/apache/falcon/ExtensionExample.java
+++ b/extensions/src/test/java/org/apache/falcon/ExtensionExample.java
@@ -21,29 +21,44 @@
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Schema;
+import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.extensions.ExtensionBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.xml.bind.JAXBException;
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
 /**
  * Extension Example for testing extension loading and preparing entities.
  */
 public class ExtensionExample implements ExtensionBuilder{
 
+    public static final Logger LOG = LoggerFactory.getLogger(ExtensionExample.class);
     public static final String PROCESS_XML = "/extension-example.xml";
 
     @Override
     public List<Entity> getEntities(String extensionName, InputStream extensionConfigStream) throws FalconException {
-        Entity process;
+        Process process;
         try {
-            process = (Entity) EntityType.PROCESS.getUnmarshaller().unmarshal(
+            process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(
                     getClass().getResourceAsStream(PROCESS_XML));
         } catch (JAXBException e) {
             throw new FalconException("Failed in un-marshalling the entity");
         }
+        if (extensionConfigStream != null) {
+            Properties properties = new Properties();
+            try {
+                properties.load(extensionConfigStream);
+            } catch (IOException e) {
+                LOG.warn("Not able to load the configStream");
+            }
+            process.setPipelines(properties.getProperty("pipelines.name"));
+        }
         List<Entity> entities = new ArrayList<>();
         entities.add(process);
         return entities;
@@ -52,7 +67,6 @@
     @Override
     public void validateExtensionConfig(String extensionName, InputStream extensionConfigStream)
         throws FalconException {
-
     }
 
     @Override
diff --git a/extensions/src/test/resources/extension-example.xml b/extensions/src/test/resources/extension-example.xml
index 4a2a982..bb391e4 100644
--- a/extensions/src/test/resources/extension-example.xml
+++ b/extensions/src/test/resources/extension-example.xml
@@ -27,7 +27,6 @@
     <parallel>1</parallel>
     <order>LIFO</order>
     <frequency>hours(1)</frequency>
-    <sla shouldStartIn="hours(2)" shouldEndIn="hours(4)"/>
     <!-- how -->
     <properties>
         <property name="name1" value="value1"/>
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
index 621974d..8bb8bbb 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
@@ -82,7 +82,7 @@
         beginTransaction(entityManager);
         Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES);
         q.setParameter("entityName", entityName);
-        try{
+        try {
             q.executeUpdate();
         } finally {
             commitAndCloseTransaction(entityManager);
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index aefd699..81b0448 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -116,7 +116,7 @@
         }
     }
 
-    protected Set<String> getAllColos() {
+    public static Set<String> getAllColos() {
         if (DeploymentUtil.isEmbeddedMode()) {
             return DeploymentUtil.getDefaultColos();
         }
@@ -141,7 +141,7 @@
         return colos;
     }
 
-    protected Set<String> getApplicableColos(String type, String name) {
+    public static Set<String> getApplicableColos(String type, String name) {
         try {
             if (DeploymentUtil.isEmbeddedMode()) {
                 return DeploymentUtil.getDefaultColos();
@@ -157,7 +157,7 @@
         }
     }
 
-    protected Set<String> getApplicableColos(String type, Entity entity) {
+    public static Set<String> getApplicableColos(String type, Entity entity) {
         try {
             if (DeploymentUtil.isEmbeddedMode()) {
                 return DeploymentUtil.getDefaultColos();
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
index d360370..9fb0dd4 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
@@ -42,7 +42,7 @@
     public static final Logger LOG = LoggerFactory.getLogger(AbstractExtensionManager.class);
 
     private static final String JOB_NAME = "jobName";
-    public static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job=";
+    protected static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job=";
     private static final String EXTENSION_NAME = "extensionName";
     private static final String FEEDS = "feeds";
     private static final String PROCESSES = "processes";
@@ -50,19 +50,19 @@
     private static final String CREATION_TIME  = "creationTime";
     private static final String LAST_UPDATE_TIME  = "lastUpdatedTime";
 
-    private static final String NAME = "name";
-    private static final String EXTENSION_TYPE = "type";
-    private static final String EXTENSION_DESC = "description";
-    private static final String EXTENSION_LOCATION = "location";
+    public static final String NAME = "name";
+    protected static final String EXTENSION_TYPE = "type";
+    protected static final String EXTENSION_DESC = "description";
+    protected static final String EXTENSION_LOCATION = "location";
 
-    public static void validateExtensionName(final String extensionName) {
+    protected static void validateExtensionName(final String extensionName) {
         if (StringUtils.isBlank(extensionName)) {
             throw FalconWebException.newAPIException("Extension name is mandatory and shouldn't be blank",
                     Response.Status.BAD_REQUEST);
         }
     }
 
-    public APIResult registerExtensionMetadata(String extensionName, String path, String description, String owner) {
+    protected APIResult registerExtensionMetadata(String extensionName, String path, String description, String owner) {
         validateExtensionName(extensionName);
         try {
             return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().registerExtension(extensionName, path,
@@ -80,7 +80,7 @@
         }
     }
 
-    public APIResult getExtensionDetail(String extensionName) {
+    protected APIResult getExtensionDetail(String extensionName) {
         try {
             return new APIResult(APIResult.Status.SUCCEEDED, buildExtensionDetailResult(extensionName).toString());
         } catch (FalconException e) {
@@ -112,6 +112,7 @@
         if (jobsBean == null) {
             throw new ValidationException("Job name not found:" + jobName);
         }
+        ExtensionBean extensionBean = metaStore.getDetail(jobsBean.getExtensionName());
         JSONObject detailsObject = new JSONObject();
         try {
             detailsObject.put(JOB_NAME, jobsBean.getJobName());
@@ -121,6 +122,8 @@
             detailsObject.put(CONFIG, jobsBean.getConfig());
             detailsObject.put(CREATION_TIME, jobsBean.getCreationTime());
             detailsObject.put(LAST_UPDATE_TIME, jobsBean.getLastUpdatedTime());
+            detailsObject.put(EXTENSION_LOCATION, extensionBean.getLocation());
+            detailsObject.put(EXTENSION_TYPE, extensionBean.getExtensionType());
         } catch (JSONException e) {
             LOG.error("Exception while building extension jon details for job {}", jobName, e);
         }
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java
new file mode 100644
index 0000000..a07a6d4
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource.proxy;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.FalconRuntimException;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.channel.Channel;
+import org.apache.falcon.resource.channel.ChannelFactory;
+import org.apache.falcon.util.DeploymentUtil;
+
+import javax.servlet.http.HttpServletRequest;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.falcon.resource.AbstractEntityManager.getAllColos;
+import static org.apache.falcon.resource.AbstractEntityManager.getApplicableColos;
+import static org.apache.falcon.resource.proxy.SchedulableEntityManagerProxy.FALCON_TAG;
+
+class EntityProxyUtil {
+    private final Map<String, Channel> entityManagerChannels = new HashMap<>();
+    private final Map<String, Channel> configSyncChannels = new HashMap<>();
+
+    EntityProxyUtil() {
+        try {
+            Set<String> colos = getAllColos();
+
+            for (String colo : colos) {
+                initializeFor(colo);
+            }
+
+            DeploymentUtil.setPrismMode();
+        } catch (FalconException e) {
+            throw new FalconRuntimException("Unable to initialize channels", e);
+        }
+    }
+    private void initializeFor(String colo) throws FalconException {
+        entityManagerChannels.put(colo, ChannelFactory.get("SchedulableEntityManager", colo));
+        configSyncChannels.put(colo, ChannelFactory.get("ConfigSyncService", colo));
+    }
+
+    Channel getConfigSyncChannel(String colo) throws FalconException {
+        if (!configSyncChannels.containsKey(colo)) {
+            initializeFor(colo);
+        }
+        return configSyncChannels.get(colo);
+    }
+
+    Channel getEntityManager(String colo) throws FalconException {
+        if (!entityManagerChannels.containsKey(colo)) {
+            initializeFor(colo);
+        }
+        return entityManagerChannels.get(colo);
+    }
+
+    Map<String, APIResult> proxySubmit(final String type, final HttpServletRequest bufferedRequest,
+                                       final Entity entity, final Set<String> colos) {
+        Map<String, APIResult> results = new HashMap<>();
+        results.put(FALCON_TAG, new EntityProxy(type, entity.getName()) {
+            @Override
+            protected Set<String> getColosToApply() {
+                return colos;
+            }
+
+            @Override
+            protected APIResult doExecute(String colo) throws FalconException {
+                return getConfigSyncChannel(colo).invoke("submit", bufferedRequest, type, colo);
+            }
+        }.execute());
+        return results;
+    }
+
+    Map<String, APIResult> proxyUpdate(final String type, final String entityName, final Boolean skipDryRun,
+                                       final HttpServletRequest bufferedRequest, Entity newEntity) {
+        final Set<String> oldColos = getApplicableColos(type, entityName);
+        final Set<String> newColos = getApplicableColos(type, newEntity);
+        final Set<String> mergedColos = new HashSet<>();
+        mergedColos.addAll(oldColos);
+        mergedColos.retainAll(newColos);    //Common colos where update should be called
+        newColos.removeAll(oldColos);   //New colos where submit should be called
+        oldColos.removeAll(mergedColos);   //Old colos where delete should be called
+
+        Map<String, APIResult> results = new HashMap<>();
+        if (!oldColos.isEmpty()) {
+            results.put(FALCON_TAG + "/delete", new EntityProxy(type, entityName) {
+                @Override
+                protected Set<String> getColosToApply() {
+                    return oldColos;
+                }
+
+                @Override
+                protected APIResult doExecute(String colo) throws FalconException {
+                    return getConfigSyncChannel(colo).invoke("delete", bufferedRequest,
+                            type, entityName, colo);
+                }
+            }.execute());
+        }
+
+        if (!mergedColos.isEmpty()) {
+            results.put(FALCON_TAG + "/update", new EntityProxy(type, entityName) {
+                @Override
+                protected Set<String> getColosToApply() {
+                    return mergedColos;
+                }
+
+                @Override
+                protected APIResult doExecute(String colo) throws FalconException {
+                    return getConfigSyncChannel(colo).invoke("update", bufferedRequest,
+                            type, entityName,
+                            colo, skipDryRun);
+                }
+            }.execute());
+        }
+
+        if (!newColos.isEmpty()) {
+            results.put(FALCON_TAG + "/submit", new EntityProxy(type, entityName) {
+                @Override
+                protected Set<String> getColosToApply() {
+                    return newColos;
+                }
+
+                @Override
+                protected APIResult doExecute(String colo) throws FalconException {
+                    return getConfigSyncChannel(colo).invoke("submit", bufferedRequest, type,
+                            colo);
+                }
+            }.execute());
+        }
+        return results;
+    }
+}
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
index 343ef6c..0e79f12 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -36,14 +36,13 @@
 import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
 import org.apache.falcon.extensions.store.ExtensionStore;
 import org.apache.falcon.persistence.ExtensionBean;
+import org.apache.falcon.persistence.ExtensionJobsBean;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.AbstractExtensionManager;
 import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.resource.ExtensionInstanceList;
 import org.apache.falcon.resource.ExtensionJobList;
-import org.apache.falcon.resource.channel.Channel;
-import org.apache.falcon.resource.channel.ChannelFactory;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.service.Services;
 import org.apache.falcon.util.DeploymentUtil;
@@ -95,8 +94,7 @@
 
     private boolean embeddedMode = DeploymentUtil.isEmbeddedMode();
     private String currentColo = DeploymentUtil.getCurrentColo();
-    private final Map<String, Channel> configSyncChannels = new HashMap<String, Channel>();
-    private final Map<String, Channel> entityManagerChannels = new HashMap<String, Channel>();
+    private EntityProxyUtil entityProxyUtil = new EntityProxyUtil();
 
 
     private static final String EXTENSION_PROPERTY_JSON_SUFFIX = "-properties.json";
@@ -126,7 +124,7 @@
             // sort by extension job name
             List<String> jobNames = new ArrayList<>(groupedEntities.keySet());
             switch (sortOrder.toLowerCase()) {
-            case DESCENDING_SORT_ORDER :
+            case DESCENDING_SORT_ORDER:
                 Collections.sort(jobNames, Collections.reverseOrder(String.CASE_INSENSITIVE_ORDER));
                 break;
             default:
@@ -325,9 +323,9 @@
     }
 
     private SortedMap<EntityType, List<Entity>> getEntityList(String extensionName, String jobName,
-                                                        List<FormDataBodyPart> feedForms,
-                                                        List<FormDataBodyPart> processForms, InputStream config)
-        throws FalconException, IOException{
+                                                              List<FormDataBodyPart> feedForms,
+                                                              List<FormDataBodyPart> processForms, InputStream config)
+        throws FalconException, IOException {
         List<Entity> processes = getProcesses(processForms);
         List<Entity> feeds = getFeeds(feedForms);
         ExtensionType extensionType = getExtensionType(extensionName);
@@ -362,11 +360,10 @@
         return extensionDetails.getExtensionType();
     }
 
-    private Channel getEntityManager(String colo) throws FalconException {
-        if (!entityManagerChannels.containsKey(colo)) {
-            initializeFor(colo);
-        }
-        return entityManagerChannels.get(colo);
+    private String getExtensionName(String jobName) {
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+        ExtensionJobsBean extensionJobDetails = metaStore.getExtensionJobDetails(jobName);
+        return extensionJobDetails.getExtensionName();
     }
 
     @POST
@@ -396,9 +393,8 @@
 
     protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap, HttpServletRequest request)
         throws FalconException, JAXBException, IOException {
-
-        for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
-            for(final Entity entity : entry.getValue()){
+        for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+            for (final Entity entity : entry.getValue()) {
                 final HttpServletRequest httpServletRequest = getEntityStream(entity, entity.getEntityType(), request);
                 final HttpServletRequest bufferedRequest = getBufferedRequest(httpServletRequest);
                 final Set<String> colos = getApplicableColos(entity.getEntityType().toString(), entity);
@@ -411,8 +407,9 @@
 
                     @Override
                     protected APIResult doExecute(String colo) throws FalconException {
-                        return getEntityManager(colo).invoke("schedule", bufferedRequest, entity.getEntityType().toString(),
-                            entity.getName(), colo, Boolean.FALSE, "");
+                        return new EntityProxyUtil().getEntityManager(colo).invoke("schedule", bufferedRequest,
+                                entity.getEntityType().toString(),
+                                entity.getName(), colo, Boolean.FALSE, "");
                     }
                 }.execute();
             }
@@ -426,7 +423,7 @@
         return new BufferedRequest(request);
     }
 
-    protected void submitEntities(String extensionName, String jobName,
+    private void submitEntities(String extensionName, String jobName,
                                   SortedMap<EntityType, List<Entity>> entityMap, InputStream configStream,
                                   HttpServletRequest request) throws FalconException, IOException, JAXBException {
         List<Entity> feeds = entityMap.get(EntityType.FEED);
@@ -436,28 +433,17 @@
         List<String> feedNames = new ArrayList<>();
         List<String> processNames = new ArrayList<>();
 
-        for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
-            for(final Entity entity : entry.getValue()){
+        for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+            for (final Entity entity : entry.getValue()) {
                 final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request);
                 final Set<String> colos = getApplicableColos(entity.getEntityType().toString(), entity);
-                new EntityProxy(entity.getEntityType().toString(), entity.getName()) {
-                    @Override
-                    protected Set<String> getColosToApply() {
-                        return colos;
-                    }
-
-                    @Override
-                    protected APIResult doExecute(String colo) throws FalconException {
-                        return getConfigSyncChannel(colo).invoke("submit", bufferedRequest,
-                                entity.getEntityType().toString(), colo);
-                    }
-                }.execute();
+                entityProxyUtil.proxySubmit(entity.getEntityType().toString(), bufferedRequest, entity, colos);
                 if (!embeddedMode) {
                     super.submit(bufferedRequest, entity.getEntityType().toString(), currentColo);
                 }
-                if (entity.getEntityType().equals(EntityType.FEED)){
+                if (entity.getEntityType().equals(EntityType.FEED)) {
                     feedNames.add(entity.getName());
-                }else{
+                } else {
                     processNames.add(entity.getName());
                 }
             }
@@ -471,24 +457,49 @@
         metaStore.storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes);
     }
 
-    private void initializeFor(String colo) throws FalconException {
-        entityManagerChannels.put(colo, ChannelFactory.get("SchedulableEntityManager", colo));
-        configSyncChannels.put(colo, ChannelFactory.get("ConfigSyncService", colo));
-    }
+    private void updateEntities(String extensionName, String jobName,
+                                SortedMap<EntityType, List<Entity>> entityMap, InputStream configStream,
+                                HttpServletRequest request) throws FalconException, IOException, JAXBException {
+        List<Entity> feeds = entityMap.get(EntityType.FEED);
+        List<Entity> processes = entityMap.get(EntityType.PROCESS);
+        validateFeeds(feeds);
+        validateProcesses(processes);
+        List<String> feedNames = new ArrayList<>();
+        List<String> processNames = new ArrayList<>();
 
-    private Channel getConfigSyncChannel(String colo) throws FalconException {
-        if (!configSyncChannels.containsKey(colo)) {
-            initializeFor(colo);
+        for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+            for (final Entity entity : entry.getValue()) {
+                final String entityType = entity.getEntityType().toString();
+                final String entityName = entity.getName();
+                final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request);
+                entityProxyUtil.proxyUpdate(entityType, entityName, Boolean.FALSE, bufferedRequest, entity);
+                if (!embeddedMode) {
+                    super.update(bufferedRequest, entity.getEntityType().toString(), entity.getName(), currentColo,
+                            Boolean.FALSE);
+                }
+                if (entity.getEntityType().equals(EntityType.FEED)) {
+                    feedNames.add(entity.getName());
+                } else {
+                    processNames.add(entity.getName());
+                }
+            }
         }
-        return configSyncChannels.get(colo);
+
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+        byte[] configBytes = null;
+        if (configStream != null) {
+            configBytes = IOUtils.toByteArray(configStream);
+        }
+        metaStore.updateExtensionJob(jobName, extensionName, feedNames, processNames, configBytes);
     }
 
-    private HttpServletRequest getEntityStream(Entity entity, EntityType type, HttpServletRequest request) throws IOException, JAXBException {
+    private HttpServletRequest getEntityStream(Entity entity, EntityType type, HttpServletRequest request)
+        throws IOException, JAXBException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
         type.getMarshaller().marshal(entity, baos);
         final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(baos.toByteArray());
-        ServletInputStream servletInputStream=new ServletInputStream(){
+        ServletInputStream servletInputStream = new ServletInputStream() {
             public int read() throws IOException {
                 return byteArrayInputStream.read();
             }
@@ -506,7 +517,7 @@
     private void validateProcesses(List<Entity> processes) throws FalconException {
         ProcessEntityParser processEntityParser = new ProcessEntityParser();
         for (Entity process : processes) {
-            processEntityParser.validate((Process)process, false);
+            processEntityParser.validate((Process) process, false);
         }
     }
 
@@ -531,21 +542,25 @@
     }
 
     @POST
-    @Path("update/{extension-name}")
-    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Path("update/{job-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA})
     @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
     public APIResult update(
-            @PathParam("extension-name") String extensionName,
+            @PathParam("job-name") String jobName,
             @Context HttpServletRequest request,
-            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+            @DefaultValue("") @QueryParam("doAs") String doAsUser,
+            @FormDataParam("processes") List<FormDataBodyPart> processForms,
+            @FormDataParam("feeds") List<FormDataBodyPart> feedForms,
+            @FormDataParam("config") InputStream config) {
         checkIfExtensionServiceIsEnabled();
+
+        SortedMap<EntityType, List<Entity>> entityMap;
+        String extensionName = getExtensionName(jobName);
         try {
-            List<Entity> entities = generateEntities(extensionName, request.getInputStream());
-            for (Entity entity : entities) {
-                super.update(entity, entity.getEntityType().name(), entity.getName(), null);
-            }
-        } catch (FalconException | IOException e) {
-            LOG.error("Error when updating extension job: ", e);
+            entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config);
+            updateEntities(extensionName, jobName, entityMap, config, request);
+        } catch (FalconException | IOException | JAXBException e) {
+            LOG.error("Error while updating extension job: " + jobName, e);
             throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
         }
         return new APIResult(APIResult.Status.SUCCEEDED, "Updated successfully");
@@ -634,7 +649,7 @@
     @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
     @Produces({MediaType.TEXT_PLAIN, MediaType.TEXT_XML})
     public APIResult deleteExtensionMetadata(
-            @PathParam("extension-name") String extensionName){
+            @PathParam("extension-name") String extensionName) {
         checkIfExtensionServiceIsEnabled();
         try {
             return super.deleteExtensionMetadata(extensionName);
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index ed1054c..74a1acc 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -20,7 +20,6 @@
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
-import org.apache.falcon.FalconRuntimException;
 import org.apache.falcon.FalconWebException;
 import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.EntityUtil;
@@ -32,14 +31,12 @@
 import org.apache.falcon.monitors.Dimension;
 import org.apache.falcon.monitors.Monitored;
 import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.AbstractExtensionManager;
 import org.apache.falcon.resource.AbstractSchedulableEntityManager;
 import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.resource.EntitySummaryResult;
 import org.apache.falcon.resource.FeedLookupResult;
 import org.apache.falcon.resource.SchedulableEntityInstanceResult;
-import org.apache.falcon.resource.channel.Channel;
-import org.apache.falcon.resource.channel.ChannelFactory;
-import org.apache.falcon.resource.AbstractExtensionManager;
 import org.apache.falcon.util.DeploymentUtil;
 
 import javax.servlet.http.HttpServletRequest;
@@ -64,47 +61,13 @@
  */
 @Path("entities")
 public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityManager {
-    private static final String PRISM_TAG = "prism";
-    public static final String FALCON_TAG = "falcon";
+    static final String PRISM_TAG = "prism";
+    static final String FALCON_TAG = "falcon";
 
-    private final Map<String, Channel> entityManagerChannels = new HashMap<String, Channel>();
-    private final Map<String, Channel> configSyncChannels = new HashMap<String, Channel>();
+    private EntityProxyUtil entityProxyUtil = new EntityProxyUtil();
     private boolean embeddedMode = DeploymentUtil.isEmbeddedMode();
     private String currentColo = DeploymentUtil.getCurrentColo();
 
-    public SchedulableEntityManagerProxy() {
-        try {
-            Set<String> colos = getAllColos();
-
-            for (String colo : colos) {
-                initializeFor(colo);
-            }
-
-            DeploymentUtil.setPrismMode();
-        } catch (FalconException e) {
-            throw new FalconRuntimException("Unable to initialize channels", e);
-        }
-    }
-
-    private void initializeFor(String colo) throws FalconException {
-        entityManagerChannels.put(colo, ChannelFactory.get("SchedulableEntityManager", colo));
-        configSyncChannels.put(colo, ChannelFactory.get("ConfigSyncService", colo));
-    }
-
-    private Channel getConfigSyncChannel(String colo) throws FalconException {
-        if (!configSyncChannels.containsKey(colo)) {
-            initializeFor(colo);
-        }
-        return configSyncChannels.get(colo);
-    }
-
-    private Channel getEntityManager(String colo) throws FalconException {
-        if (!entityManagerChannels.containsKey(colo)) {
-            initializeFor(colo);
-        }
-        return entityManagerChannels.get(colo);
-    }
-
     private BufferedRequest getBufferedRequest(HttpServletRequest request) {
         if (request instanceof BufferedRequest) {
             return (BufferedRequest) request;
@@ -136,8 +99,8 @@
 
             @Override
             protected SchedulableEntityInstanceResult doExecute(String colo) throws FalconException {
-                return getEntityManager(colo).invoke("getEntitySLAMissPendingAlerts", entityType, entityName,
-                        start, end, colo);
+                return entityProxyUtil.getEntityManager(colo).invoke("getEntitySLAMissPendingAlerts", entityType,
+                        entityName, start, end, colo);
             }
         }.execute();
     }
@@ -162,24 +125,13 @@
         final HttpServletRequest bufferedRequest = getBufferedRequest(request);
 
         final Entity entity = getEntity(bufferedRequest, type);
-        Map<String, APIResult> results = new HashMap<String, APIResult>();
+        Map<String, APIResult> results = new HashMap<>();
         final Set<String> colos = getApplicableColos(type, entity);
 
         entityHasExtensionJobTag(entity);
         validateEntity(entity, colos);
 
-        results.put(FALCON_TAG, new EntityProxy(type, entity.getName()) {
-            @Override
-            protected Set<String> getColosToApply() {
-                return colos;
-            }
-
-            @Override
-            protected APIResult doExecute(String colo) throws FalconException {
-                return getConfigSyncChannel(colo).invoke("submit", bufferedRequest, type, colo);
-            }
-        }.execute());
-
+        results.putAll(entityProxyUtil.proxySubmit(type, bufferedRequest, entity, colos));
         if (!embeddedMode) {
             results.put(PRISM_TAG, super.submit(bufferedRequest, type, currentColo));
         }
@@ -240,7 +192,8 @@
 
             @Override
             protected APIResult doExecute(String colo) throws FalconException {
-                return getEntityManager(colo).invoke("validate", bufferedRequest, type, skipDryRun);
+                return entityProxyUtil.getEntityManager(colo).invoke("validate", bufferedRequest, type,
+                        skipDryRun);
             }
         }.execute();
     }
@@ -287,7 +240,8 @@
 
             @Override
             protected APIResult doExecute(String colo) throws FalconException {
-                return getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entityName, colo);
+                return entityProxyUtil.getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entityName,
+                        colo);
             }
         }.execute());
 
@@ -326,58 +280,10 @@
         final HttpServletRequest bufferedRequest = new BufferedRequest(request);
         Entity newEntity = getEntity(bufferedRequest, type);
         entityHasExtensionJobTag(newEntity);
-        final Set<String> oldColos = getApplicableColos(type, entityName);
-        final Set<String> newColos = getApplicableColos(type, newEntity);
-        final Set<String> mergedColos = new HashSet<String>();
-        mergedColos.addAll(oldColos);
-        mergedColos.retainAll(newColos);    //Common colos where update should be called
-        newColos.removeAll(oldColos);   //New colos where submit should be called
-        oldColos.removeAll(mergedColos);   //Old colos where delete should be called
 
-        Map<String, APIResult> results = new HashMap<String, APIResult>();
+        Map<String, APIResult> results = new HashMap<>();
         boolean result = true;
-        if (!oldColos.isEmpty()) {
-            results.put(FALCON_TAG + "/delete", new EntityProxy(type, entityName) {
-                @Override
-                protected Set<String> getColosToApply() {
-                    return oldColos;
-                }
-
-                @Override
-                protected APIResult doExecute(String colo) throws FalconException {
-                    return getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entityName, colo);
-                }
-            }.execute());
-        }
-
-        if (!mergedColos.isEmpty()) {
-            results.put(FALCON_TAG + "/update", new EntityProxy(type, entityName) {
-                @Override
-                protected Set<String> getColosToApply() {
-                    return mergedColos;
-                }
-
-                @Override
-                protected APIResult doExecute(String colo) throws FalconException {
-                    return getConfigSyncChannel(colo).invoke("update", bufferedRequest, type, entityName,
-                            colo, skipDryRun);
-                }
-            }.execute());
-        }
-
-        if (!newColos.isEmpty()) {
-            results.put(FALCON_TAG + "/submit", new EntityProxy(type, entityName) {
-                @Override
-                protected Set<String> getColosToApply() {
-                    return newColos;
-                }
-
-                @Override
-                protected APIResult doExecute(String colo) throws FalconException {
-                    return getConfigSyncChannel(colo).invoke("submit", bufferedRequest, type, colo);
-                }
-            }.execute());
-        }
+        results.putAll(entityProxyUtil.proxyUpdate(type, entityName, skipDryRun, bufferedRequest, newEntity));
 
         for (APIResult apiResult : results.values()) {
             if (apiResult.getStatus() != APIResult.Status.SUCCEEDED) {
@@ -451,8 +357,8 @@
 
                 @Override
                 protected APIResult doExecute(String colo) throws FalconException {
-                    return getConfigSyncChannel(colo).invoke("updateClusterDependents", clusterName,
-                            colo, skipDryRun);
+                    return entityProxyUtil.getConfigSyncChannel(colo).invoke("updateClusterDependents",
+                            clusterName, colo, skipDryRun);
                 }
             }.execute());
         }
@@ -497,7 +403,7 @@
 
             @Override
             protected APIResult doExecute(String colo) throws FalconException {
-                return getEntityManager(colo).invoke("touch", type, entityName, colo, skipDryRun);
+                return entityProxyUtil.getEntityManager(colo).invoke("touch", type, entityName, colo, skipDryRun);
             }
         }.execute();
     }
@@ -527,7 +433,8 @@
 
             @Override
             protected APIResult doExecute(String colo) throws FalconException {
-                return getEntityManager(colo).invoke("getStatus", type, entity, colo, showScheduler);
+                return entityProxyUtil.getEntityManager(colo).invoke("getStatus", type, entity, colo,
+                        showScheduler);
             }
         }.execute();
     }
@@ -592,8 +499,8 @@
 
             @Override
             protected APIResult doExecute(String colo) throws FalconException {
-                return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity, colo, skipDryRun,
-                        properties);
+                return entityProxyUtil.getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity,
+                        colo, skipDryRun, properties);
             }
         }.execute();
     }
@@ -654,7 +561,8 @@
 
             @Override
             protected APIResult doExecute(String colo) throws FalconException {
-                return getEntityManager(colo).invoke("suspend", bufferedRequest, type, entity, colo);
+                return entityProxyUtil.getEntityManager(colo).invoke("suspend", bufferedRequest, type, entity,
+                        colo);
             }
         }.execute();
     }
@@ -686,7 +594,8 @@
 
             @Override
             protected APIResult doExecute(String colo) throws FalconException {
-                return getEntityManager(colo).invoke("resume", bufferedRequest, type, entity, colo);
+                return entityProxyUtil.getEntityManager(colo).invoke("resume", bufferedRequest, type, entity,
+                        colo);
             }
         }.execute();
     }
@@ -811,9 +720,9 @@
 
             @Override
             protected EntitySummaryResult doExecute(String colo) throws FalconException {
-                EntitySummaryResult es = getEntityManager(colo).invoke("getEntitySummary", type, clusterName, startStr,
-                    endStr, entityFields, entityFilter, entityTags, entityOrderBy, entitySortOrder, entityOffset,
-                    numEntities, numInstanceResults, doAsUser);
+                EntitySummaryResult es = entityProxyUtil.getEntityManager(colo).invoke("getEntitySummary", type,
+                        clusterName, startStr, endStr, entityFields, entityFilter, entityTags, entityOrderBy,
+                        entitySortOrder, entityOffset, numEntities, numInstanceResults, doAsUser);
                 return es;
             }
         }.execute();
@@ -844,7 +753,7 @@
 
             @Override
             protected FeedLookupResult doExecute(String colo) throws FalconException {
-                return getEntityManager(colo).invoke("reverseLookup", type, path);
+                return entityProxyUtil.getEntityManager(colo).invoke("reverseLookup", type, path);
             }
         }.execute();
 
diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
index 7688619..23f4cf1 100644
--- a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
+++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
@@ -146,12 +146,14 @@
             return;
         }
         Process newProcess = (Process) newEntity;
+        Process oldProcess = EntityUtil.getEntity(oldEntity.getEntityType(), oldEntity.getName());
         if (newProcess.getSla() == null || newProcess.getSla().getShouldEndIn() == null){
-            backlogMetricStore.deleteEntityInstance(newProcess.getName());
-            entityBacklogs.remove(newProcess);
-            Process process = EntityUtil.getEntity(oldEntity.getEntityType(), oldEntity.getName());
-            for(Cluster cluster : process.getClusters().getClusters()){
-                dropMetric(cluster.getName(), process);
+            if (oldProcess.getSla() != null) {
+                backlogMetricStore.deleteEntityInstance(newProcess.getName());
+                entityBacklogs.remove(newProcess);
+                for (Cluster cluster : oldProcess.getClusters().getClusters()) {
+                    dropMetric(cluster.getName(), oldProcess);
+                }
             }
         } else {
             addToBacklog(newEntity);
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 3a3c5b2..66b8e9b 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -293,7 +293,8 @@
         }
     }
 
-    private SortedMap<EntityType, List<Entity>> getEntityTypeListMap(String extensionName, String jobName, InputStream configStream) {
+    private SortedMap<EntityType, List<Entity>> getEntityTypeListMap(String extensionName, String jobName,
+                                                                     InputStream configStream) {
         List<Entity> entities = getEntities(extensionName, jobName, configStream);
         List<Entity> feeds = new ArrayList<>();
         List<Entity> processes = new ArrayList<>();
@@ -335,6 +336,18 @@
         }
     }
 
+    public APIResult updateExtensionJob(String jobName, String configPath, String doAsUser) {
+        InputStream configStream = getServletInputStream(configPath);
+        try {
+            String extensionName = ExtensionStore.getMetaStore().getExtensionJobDetails(jobName).getExtensionName();
+            SortedMap<EntityType, List<Entity>> entityMap = getEntityTypeListMap(extensionName, jobName, configStream);
+            return localExtensionManager.updateExtensionJob(extensionName, jobName, configStream,
+                    entityMap);
+        } catch (FalconException | IOException e) {
+            throw new FalconCLIException("Failed in updating the extension job " + jobName);
+        }
+    }
+
     @Override
     public APIResult getExtensionJobDetails(final String jobName) {
         return localExtensionManager.getExtensionJobDetails(jobName);
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index 4cf3ae4..7002dc8 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -86,6 +86,32 @@
         return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName);
     }
 
+    public APIResult updateExtensionJob(String extensionName, String jobName, InputStream configStream,
+                                        SortedMap<EntityType, List<Entity>> entityMap)
+        throws FalconException, IOException {
+        List<String> feedNames = new ArrayList<>();
+        List<String> processNames = new ArrayList<>();
+        for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+            for (Entity entity : entry.getValue()) {
+                update(entity, entity.getEntityType().toString(), entity.getName(), true);
+            }
+        }
+        byte[] configBytes = null;
+        if (configStream != null) {
+            configBytes = IOUtils.toByteArray(configStream);
+        }
+        for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+            for (final Entity entity : entry.getValue()) {
+                if (entity.getEntityType().equals(EntityType.FEED)) {
+                    feedNames.add(entity.getName());
+                } else {
+                    processNames.add(entity.getName());
+                }
+            }
+        }
+        ExtensionStore.getMetaStore().updateExtensionJob(jobName, extensionName, feedNames, processNames, configBytes);
+        return new APIResult(APIResult.Status.SUCCEEDED, "Updated successfully");
+    }
 
     public APIResult registerExtensionMetadata(String extensionName, String packagePath , String description) {
         return super.registerExtensionMetadata(extensionName, packagePath, description, CurrentUser.getUser());
@@ -100,7 +126,7 @@
     }
 
     public APIResult getExtensionDetails(String extensionName){
-        return super.getExtensionJobDetail(extensionName);
+        return super.getExtensionDetail(extensionName);
     }
 
     public APIResult getExtensions(){
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index 0771b9d..690fdd5 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -241,6 +241,11 @@
         return falconUnitClient.submitAndScheduleExtensionJob(extensionName, jobName, configPath, doAsUser);
     }
 
+    APIResult updateExtensionJob(String jobName, String configPath, String doAsUser) {
+        return falconUnitClient.updateExtensionJob(jobName, configPath, doAsUser);
+    }
+
+
     public static String overlayParametersOverTemplate(String template,
                                                        Map<String, String> overlay) throws IOException {
         File tmpFile = getTempFile();
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 293bb23..07d8195 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -70,7 +70,8 @@
     private static final String WORKFLOW = "workflow.xml";
     private static final String SLEEP_WORKFLOW = "sleepWorkflow.xml";
     private static final String EXTENSION_PATH = "/projects/falcon/extension/testExtension";
-    public static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources";
+    private static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources";
+    private static final String EXTENSION_PROPERTIES = "extension.properties";
     private FileSystem fileSystem;
 
     private static final String STORAGE_URL = "jail://global:00";
@@ -445,6 +446,20 @@
         result = getExtensionJobDetails("testJob");
         JSONObject resultJson = new JSONObject(result);
         Assert.assertEquals(resultJson.get("extensionName"), "testExtension");
+        Process process = (Process)getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
+        Assert.assertEquals(process.getPipelines(), "testPipeline");
+
+        apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false);
+        assertStatus(apiResult);
+        Assert.assertEquals(apiResult.getMessage(), "RUNNING");
+
+        apiResult = updateExtensionJob("testJob", getAbsolutePath(EXTENSION_PROPERTIES), null);
+        assertStatus(apiResult);
+
+        String processes = new JSONObject(getExtensionJobDetails("testJob")).get("processes").toString();
+        Assert.assertEquals(processes, "sample");
+        process = (Process)getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
+        Assert.assertEquals(process.getPipelines(), "testSample");
 
         apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false);
         assertStatus(apiResult);
diff --git a/unit/src/test/resources/extension.properties b/unit/src/test/resources/extension.properties
index d52de1e..0f2d7e8 100644
--- a/unit/src/test/resources/extension.properties
+++ b/unit/src/test/resources/extension.properties
@@ -20,4 +20,4 @@
 ####    This is used for falcon packaging only. ####
 ####################################################
 
-pipelines.name=test
+pipelines.name=testSample