FALCON-2200 Update API support for extension job (user extension)
Author: sandeep <sandysmdl@gmail.com>
Reviewers: @pallavi-rao
Closes #331 from sandeepSamudrala/FALCON-2200 and squashes the following commits:
737fad3 [sandeep] FALCON-2200 fixed checkstyle issues. removed unused imports
1780416 [sandeep] Incorporated review comments. Removed entitychannel and config channel from ExtensionManager Proxy as they are now used from proxyUtil
8a4d035 [sandeep] FALCON-2200 Incorporated review comments. Moved common code from proxies to proxyutil and making 2 api calls to get location in case of update extension
c8d0ab7 [sandeep] FALCON-2200 Adding changes related to clusters being removed and clusters being added into entity definition
cc7c9e9 [sandeep] FALCON-2200 Update API support for extension job (user extension)
456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon
b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon
194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon
e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
index 59538bc..aa436da 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
@@ -110,12 +110,13 @@
result = client.registerExtension(extensionName, path, description).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
+ validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.submitAndScheduleExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) {
- validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
+ validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
- result = client.updateExtensionJob(extensionName, filePath, doAsUser).getMessage();
+ result = client.updateExtensionJob(jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
diff --git a/client/src/main/java/org/apache/falcon/ExtensionHandler.java b/client/src/main/java/org/apache/falcon/ExtensionHandler.java
index 11122e2..5e2f26d 100644
--- a/client/src/main/java/org/apache/falcon/ExtensionHandler.java
+++ b/client/src/main/java/org/apache/falcon/ExtensionHandler.java
@@ -54,6 +54,7 @@
private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir"));
private static final String LOCATION = "location";
private static final String TYPE = "type";
+ private static final String NAME = "name";
private static final String EXTENSION_BUILDER_INTERFACE_SERVICE_FILE =
"META-INF/services/org.apache.falcon.extensions.ExtensionBuilder";
@@ -220,4 +221,14 @@
}
return extensionType;
}
+
+ public static String getExtensionName(String jobName, JSONObject extensionJobDetailJson) {
+ String extensionType;
+ try {
+ extensionType = extensionJobDetailJson.get(NAME).toString();
+ } catch (JSONException e) {
+ throw new FalconCLIException("Failed to get extension name for the given extension job:" + jobName, e);
+ }
+ return extensionType;
+ }
}
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index 879d794..8cdbf30 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -222,6 +222,16 @@
String doAsUser);
/**
+ * Prepares set of entities the extension has implemented and stage them to a local directory and updates them.
+ * @param jobName name to be used in all the extension entities' tagging that are built as part of
+ * loadAndPrepare.
+ * @param configPath path to extension parameters.
+ * @return
+ * @throws FalconCLIException
+ */
+ public abstract APIResult updateExtensionJob(String jobName, String configPath, String doAsUser);
+
+ /**
* Prepares set of entities the extension has implemented to validate the extension job.
* @param jobName job name of the extension job.
* @return
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index e03e82d..9adb142 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -1028,9 +1028,12 @@
}
public APIResult getExtensionJobDetails(final String jobName) {
- ClientResponse clientResponse = new ResourceBuilder().path(ExtensionOperations.JOB_DETAILS.path, jobName)
+ return getResponse(APIResult.class, getExtensionJobDetailsResponse(jobName));
+ }
+
+ private ClientResponse getExtensionJobDetailsResponse(final String jobName) {
+ return new ResourceBuilder().path(ExtensionOperations.JOB_DETAILS.path, jobName)
.call(ExtensionOperations.JOB_DETAILS);
- return getResponse(APIResult.class, clientResponse);
}
private ClientResponse getExtensionDetailResponse(final String extensionName) {
@@ -1097,7 +1100,11 @@
private List<Entity> validateExtensionAndGetEntities(String extensionName, String jobName,
InputStream configStream) {
- JSONObject extensionDetailJson = getExtensionDetailJson(extensionName);
+ JSONObject extensionDetailJson;
+ if (StringUtils.isBlank(extensionName)) {
+ extensionName = ExtensionHandler.getExtensionName(jobName, getExtensionJobDetailJson(jobName));
+ }
+ extensionDetailJson = getExtensionDetailJson(extensionName);
String extensionType = ExtensionHandler.getExtensionType(extensionName, extensionDetailJson);
String extensionBuildLocation = ExtensionHandler.getExtensionLocation(extensionName, extensionDetailJson);
return getEntities(extensionName, jobName, configStream, extensionType,
@@ -1115,6 +1122,16 @@
}
return extensionDetailJson;
}
+ private JSONObject getExtensionJobDetailJson(String jobName) {
+ ClientResponse clientResponse = getExtensionJobDetailsResponse(jobName);
+ JSONObject extensionJobDetailJson;
+ try {
+ extensionJobDetailJson = new JSONObject(getResponse(APIResult.class, clientResponse).getMessage());
+ } catch (JSONException e) {
+ throw new FalconCLIException("Failed to get details for the given extension", e);
+ }
+ return extensionJobDetailJson;
+ }
private List<Entity> getEntities(String extensionName, String jobName, InputStream configStream,
String extensionType, String extensionBuildLocation) {
@@ -1144,12 +1161,12 @@
return getResponse(APIResult.class, clientResponse);
}
- public APIResult updateExtensionJob(final String extensionName, final String filePath, final String doAsUser) {
- InputStream entityStream = getServletInputStream(filePath);
+ public APIResult updateExtensionJob(final String jobName, final String configPath, final String doAsUser) {
+ FormDataMultiPart entitiesForm = getEntitiesForm(null, jobName, configPath);
ClientResponse clientResponse = new ResourceBuilder()
- .path(ExtensionOperations.UPDATE.path, extensionName)
+ .path(ExtensionOperations.UPDATE.path, jobName)
.addQueryParam(DO_AS_OPT, doAsUser)
- .call(ExtensionOperations.UPDATE, entityStream);
+ .call(ExtensionOperations.UPDATE, entitiesForm);
return getResponse(APIResult.class, clientResponse);
}
diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
index 6ad887e..cb8c816 100644
--- a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
+++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -272,12 +272,12 @@
"create table FALCON_DB_PROPS (name varchar(100), data varchar(100))";
private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception {
- String insertDbVerion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";
+ String insertDbVersion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";
PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
writer.println();
writer.println(CREATE_FALCON_DB_PROPS);
- writer.println(insertDbVerion);
+ writer.println(insertDbVersion);
writer.close();
System.out.println("Create FALCON_DB_PROPS table");
if (run) {
@@ -287,7 +287,7 @@
conn.setAutoCommit(true);
st = conn.createStatement();
st.executeUpdate(CREATE_FALCON_DB_PROPS);
- st.executeUpdate(insertDbVerion);
+ st.executeUpdate(insertDbVersion);
st.close();
} catch (Exception ex) {
closeStatement(st);
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
index 277cb95..03f98f6 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
@@ -172,6 +172,23 @@
}
}
+ public void updateExtensionJob(String jobName, String extensionName, List<String> feedNames,
+ List<String> processNames, byte[] configBytes) {
+ EntityManager entityManager = getEntityManager();
+ ExtensionJobsBean extensionJobsBean = new ExtensionJobsBean();
+ extensionJobsBean.setJobName(jobName);
+ extensionJobsBean.setExtensionName(extensionName);
+ extensionJobsBean.setFeeds(feedNames);
+ extensionJobsBean.setProcesses(processNames);
+ extensionJobsBean.setConfig(configBytes);
+ try {
+ beginTransaction(entityManager);
+ entityManager.merge(extensionJobsBean);
+ } finally {
+ commitAndCloseTransaction(entityManager);
+ }
+ }
+
public ExtensionJobsBean getExtensionJobDetails(String jobName) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
diff --git a/extensions/src/test/java/org/apache/falcon/ExtensionExample.java b/extensions/src/test/java/org/apache/falcon/ExtensionExample.java
index 432e37b..d3de2e6 100644
--- a/extensions/src/test/java/org/apache/falcon/ExtensionExample.java
+++ b/extensions/src/test/java/org/apache/falcon/ExtensionExample.java
@@ -21,29 +21,44 @@
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Schema;
+import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.extensions.ExtensionBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.xml.bind.JAXBException;
+import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
/**
* Extension Example for testing extension loading and preparing entities.
*/
public class ExtensionExample implements ExtensionBuilder{
+ public static final Logger LOG = LoggerFactory.getLogger(ExtensionExample.class);
public static final String PROCESS_XML = "/extension-example.xml";
@Override
public List<Entity> getEntities(String extensionName, InputStream extensionConfigStream) throws FalconException {
- Entity process;
+ Process process;
try {
- process = (Entity) EntityType.PROCESS.getUnmarshaller().unmarshal(
+ process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(
getClass().getResourceAsStream(PROCESS_XML));
} catch (JAXBException e) {
throw new FalconException("Failed in un-marshalling the entity");
}
+ if (extensionConfigStream != null) {
+ Properties properties = new Properties();
+ try {
+ properties.load(extensionConfigStream);
+ } catch (IOException e) {
+ LOG.warn("Not able to load the configStream");
+ }
+ process.setPipelines(properties.getProperty("pipelines.name"));
+ }
List<Entity> entities = new ArrayList<>();
entities.add(process);
return entities;
@@ -52,7 +67,6 @@
@Override
public void validateExtensionConfig(String extensionName, InputStream extensionConfigStream)
throws FalconException {
-
}
@Override
diff --git a/extensions/src/test/resources/extension-example.xml b/extensions/src/test/resources/extension-example.xml
index 4a2a982..bb391e4 100644
--- a/extensions/src/test/resources/extension-example.xml
+++ b/extensions/src/test/resources/extension-example.xml
@@ -27,7 +27,6 @@
<parallel>1</parallel>
<order>LIFO</order>
<frequency>hours(1)</frequency>
- <sla shouldStartIn="hours(2)" shouldEndIn="hours(4)"/>
<!-- how -->
<properties>
<property name="name1" value="value1"/>
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
index 621974d..8bb8bbb 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
@@ -82,7 +82,7 @@
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES);
q.setParameter("entityName", entityName);
- try{
+ try {
q.executeUpdate();
} finally {
commitAndCloseTransaction(entityManager);
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index aefd699..81b0448 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -116,7 +116,7 @@
}
}
- protected Set<String> getAllColos() {
+ public static Set<String> getAllColos() {
if (DeploymentUtil.isEmbeddedMode()) {
return DeploymentUtil.getDefaultColos();
}
@@ -141,7 +141,7 @@
return colos;
}
- protected Set<String> getApplicableColos(String type, String name) {
+ public static Set<String> getApplicableColos(String type, String name) {
try {
if (DeploymentUtil.isEmbeddedMode()) {
return DeploymentUtil.getDefaultColos();
@@ -157,7 +157,7 @@
}
}
- protected Set<String> getApplicableColos(String type, Entity entity) {
+ public static Set<String> getApplicableColos(String type, Entity entity) {
try {
if (DeploymentUtil.isEmbeddedMode()) {
return DeploymentUtil.getDefaultColos();
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
index d360370..9fb0dd4 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
@@ -42,7 +42,7 @@
public static final Logger LOG = LoggerFactory.getLogger(AbstractExtensionManager.class);
private static final String JOB_NAME = "jobName";
- public static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job=";
+ protected static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job=";
private static final String EXTENSION_NAME = "extensionName";
private static final String FEEDS = "feeds";
private static final String PROCESSES = "processes";
@@ -50,19 +50,19 @@
private static final String CREATION_TIME = "creationTime";
private static final String LAST_UPDATE_TIME = "lastUpdatedTime";
- private static final String NAME = "name";
- private static final String EXTENSION_TYPE = "type";
- private static final String EXTENSION_DESC = "description";
- private static final String EXTENSION_LOCATION = "location";
+ public static final String NAME = "name";
+ protected static final String EXTENSION_TYPE = "type";
+ protected static final String EXTENSION_DESC = "description";
+ protected static final String EXTENSION_LOCATION = "location";
- public static void validateExtensionName(final String extensionName) {
+ protected static void validateExtensionName(final String extensionName) {
if (StringUtils.isBlank(extensionName)) {
throw FalconWebException.newAPIException("Extension name is mandatory and shouldn't be blank",
Response.Status.BAD_REQUEST);
}
}
- public APIResult registerExtensionMetadata(String extensionName, String path, String description, String owner) {
+ protected APIResult registerExtensionMetadata(String extensionName, String path, String description, String owner) {
validateExtensionName(extensionName);
try {
return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().registerExtension(extensionName, path,
@@ -80,7 +80,7 @@
}
}
- public APIResult getExtensionDetail(String extensionName) {
+ protected APIResult getExtensionDetail(String extensionName) {
try {
return new APIResult(APIResult.Status.SUCCEEDED, buildExtensionDetailResult(extensionName).toString());
} catch (FalconException e) {
@@ -112,6 +112,7 @@
if (jobsBean == null) {
throw new ValidationException("Job name not found:" + jobName);
}
+ ExtensionBean extensionBean = metaStore.getDetail(jobsBean.getExtensionName());
JSONObject detailsObject = new JSONObject();
try {
detailsObject.put(JOB_NAME, jobsBean.getJobName());
@@ -121,6 +122,8 @@
detailsObject.put(CONFIG, jobsBean.getConfig());
detailsObject.put(CREATION_TIME, jobsBean.getCreationTime());
detailsObject.put(LAST_UPDATE_TIME, jobsBean.getLastUpdatedTime());
+ detailsObject.put(EXTENSION_LOCATION, extensionBean.getLocation());
+ detailsObject.put(EXTENSION_TYPE, extensionBean.getExtensionType());
} catch (JSONException e) {
LOG.error("Exception while building extension jon details for job {}", jobName, e);
}
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java
new file mode 100644
index 0000000..a07a6d4
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource.proxy;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.FalconRuntimException;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.channel.Channel;
+import org.apache.falcon.resource.channel.ChannelFactory;
+import org.apache.falcon.util.DeploymentUtil;
+
+import javax.servlet.http.HttpServletRequest;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.falcon.resource.AbstractEntityManager.getAllColos;
+import static org.apache.falcon.resource.AbstractEntityManager.getApplicableColos;
+import static org.apache.falcon.resource.proxy.SchedulableEntityManagerProxy.FALCON_TAG;
+
+class EntityProxyUtil {
+ private final Map<String, Channel> entityManagerChannels = new HashMap<>();
+ private final Map<String, Channel> configSyncChannels = new HashMap<>();
+
+ EntityProxyUtil() {
+ try {
+ Set<String> colos = getAllColos();
+
+ for (String colo : colos) {
+ initializeFor(colo);
+ }
+
+ DeploymentUtil.setPrismMode();
+ } catch (FalconException e) {
+ throw new FalconRuntimException("Unable to initialize channels", e);
+ }
+ }
+ private void initializeFor(String colo) throws FalconException {
+ entityManagerChannels.put(colo, ChannelFactory.get("SchedulableEntityManager", colo));
+ configSyncChannels.put(colo, ChannelFactory.get("ConfigSyncService", colo));
+ }
+
+ Channel getConfigSyncChannel(String colo) throws FalconException {
+ if (!configSyncChannels.containsKey(colo)) {
+ initializeFor(colo);
+ }
+ return configSyncChannels.get(colo);
+ }
+
+ Channel getEntityManager(String colo) throws FalconException {
+ if (!entityManagerChannels.containsKey(colo)) {
+ initializeFor(colo);
+ }
+ return entityManagerChannels.get(colo);
+ }
+
+ Map<String, APIResult> proxySubmit(final String type, final HttpServletRequest bufferedRequest,
+ final Entity entity, final Set<String> colos) {
+ Map<String, APIResult> results = new HashMap<>();
+ results.put(FALCON_TAG, new EntityProxy(type, entity.getName()) {
+ @Override
+ protected Set<String> getColosToApply() {
+ return colos;
+ }
+
+ @Override
+ protected APIResult doExecute(String colo) throws FalconException {
+ return getConfigSyncChannel(colo).invoke("submit", bufferedRequest, type, colo);
+ }
+ }.execute());
+ return results;
+ }
+
+ Map<String, APIResult> proxyUpdate(final String type, final String entityName, final Boolean skipDryRun,
+ final HttpServletRequest bufferedRequest, Entity newEntity) {
+ final Set<String> oldColos = getApplicableColos(type, entityName);
+ final Set<String> newColos = getApplicableColos(type, newEntity);
+ final Set<String> mergedColos = new HashSet<>();
+ mergedColos.addAll(oldColos);
+ mergedColos.retainAll(newColos); //Common colos where update should be called
+ newColos.removeAll(oldColos); //New colos where submit should be called
+ oldColos.removeAll(mergedColos); //Old colos where delete should be called
+
+ Map<String, APIResult> results = new HashMap<>();
+ if (!oldColos.isEmpty()) {
+ results.put(FALCON_TAG + "/delete", new EntityProxy(type, entityName) {
+ @Override
+ protected Set<String> getColosToApply() {
+ return oldColos;
+ }
+
+ @Override
+ protected APIResult doExecute(String colo) throws FalconException {
+ return getConfigSyncChannel(colo).invoke("delete", bufferedRequest,
+ type, entityName, colo);
+ }
+ }.execute());
+ }
+
+ if (!mergedColos.isEmpty()) {
+ results.put(FALCON_TAG + "/update", new EntityProxy(type, entityName) {
+ @Override
+ protected Set<String> getColosToApply() {
+ return mergedColos;
+ }
+
+ @Override
+ protected APIResult doExecute(String colo) throws FalconException {
+ return getConfigSyncChannel(colo).invoke("update", bufferedRequest,
+ type, entityName,
+ colo, skipDryRun);
+ }
+ }.execute());
+ }
+
+ if (!newColos.isEmpty()) {
+ results.put(FALCON_TAG + "/submit", new EntityProxy(type, entityName) {
+ @Override
+ protected Set<String> getColosToApply() {
+ return newColos;
+ }
+
+ @Override
+ protected APIResult doExecute(String colo) throws FalconException {
+ return getConfigSyncChannel(colo).invoke("submit", bufferedRequest, type,
+ colo);
+ }
+ }.execute());
+ }
+ return results;
+ }
+}
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
index 343ef6c..0e79f12 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -36,14 +36,13 @@
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.extensions.store.ExtensionStore;
import org.apache.falcon.persistence.ExtensionBean;
+import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.AbstractExtensionManager;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.ExtensionInstanceList;
import org.apache.falcon.resource.ExtensionJobList;
-import org.apache.falcon.resource.channel.Channel;
-import org.apache.falcon.resource.channel.ChannelFactory;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.service.Services;
import org.apache.falcon.util.DeploymentUtil;
@@ -95,8 +94,7 @@
private boolean embeddedMode = DeploymentUtil.isEmbeddedMode();
private String currentColo = DeploymentUtil.getCurrentColo();
- private final Map<String, Channel> configSyncChannels = new HashMap<String, Channel>();
- private final Map<String, Channel> entityManagerChannels = new HashMap<String, Channel>();
+ private EntityProxyUtil entityProxyUtil = new EntityProxyUtil();
private static final String EXTENSION_PROPERTY_JSON_SUFFIX = "-properties.json";
@@ -126,7 +124,7 @@
// sort by extension job name
List<String> jobNames = new ArrayList<>(groupedEntities.keySet());
switch (sortOrder.toLowerCase()) {
- case DESCENDING_SORT_ORDER :
+ case DESCENDING_SORT_ORDER:
Collections.sort(jobNames, Collections.reverseOrder(String.CASE_INSENSITIVE_ORDER));
break;
default:
@@ -325,9 +323,9 @@
}
private SortedMap<EntityType, List<Entity>> getEntityList(String extensionName, String jobName,
- List<FormDataBodyPart> feedForms,
- List<FormDataBodyPart> processForms, InputStream config)
- throws FalconException, IOException{
+ List<FormDataBodyPart> feedForms,
+ List<FormDataBodyPart> processForms, InputStream config)
+ throws FalconException, IOException {
List<Entity> processes = getProcesses(processForms);
List<Entity> feeds = getFeeds(feedForms);
ExtensionType extensionType = getExtensionType(extensionName);
@@ -362,11 +360,10 @@
return extensionDetails.getExtensionType();
}
- private Channel getEntityManager(String colo) throws FalconException {
- if (!entityManagerChannels.containsKey(colo)) {
- initializeFor(colo);
- }
- return entityManagerChannels.get(colo);
+ private String getExtensionName(String jobName) {
+ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+ ExtensionJobsBean extensionJobDetails = metaStore.getExtensionJobDetails(jobName);
+ return extensionJobDetails.getExtensionName();
}
@POST
@@ -396,9 +393,8 @@
protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap, HttpServletRequest request)
throws FalconException, JAXBException, IOException {
-
- for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
- for(final Entity entity : entry.getValue()){
+ for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+ for (final Entity entity : entry.getValue()) {
final HttpServletRequest httpServletRequest = getEntityStream(entity, entity.getEntityType(), request);
final HttpServletRequest bufferedRequest = getBufferedRequest(httpServletRequest);
final Set<String> colos = getApplicableColos(entity.getEntityType().toString(), entity);
@@ -411,8 +407,9 @@
@Override
protected APIResult doExecute(String colo) throws FalconException {
- return getEntityManager(colo).invoke("schedule", bufferedRequest, entity.getEntityType().toString(),
- entity.getName(), colo, Boolean.FALSE, "");
+ return new EntityProxyUtil().getEntityManager(colo).invoke("schedule", bufferedRequest,
+ entity.getEntityType().toString(),
+ entity.getName(), colo, Boolean.FALSE, "");
}
}.execute();
}
@@ -426,7 +423,7 @@
return new BufferedRequest(request);
}
- protected void submitEntities(String extensionName, String jobName,
+ private void submitEntities(String extensionName, String jobName,
SortedMap<EntityType, List<Entity>> entityMap, InputStream configStream,
HttpServletRequest request) throws FalconException, IOException, JAXBException {
List<Entity> feeds = entityMap.get(EntityType.FEED);
@@ -436,28 +433,17 @@
List<String> feedNames = new ArrayList<>();
List<String> processNames = new ArrayList<>();
- for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
- for(final Entity entity : entry.getValue()){
+ for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+ for (final Entity entity : entry.getValue()) {
final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request);
final Set<String> colos = getApplicableColos(entity.getEntityType().toString(), entity);
- new EntityProxy(entity.getEntityType().toString(), entity.getName()) {
- @Override
- protected Set<String> getColosToApply() {
- return colos;
- }
-
- @Override
- protected APIResult doExecute(String colo) throws FalconException {
- return getConfigSyncChannel(colo).invoke("submit", bufferedRequest,
- entity.getEntityType().toString(), colo);
- }
- }.execute();
+ entityProxyUtil.proxySubmit(entity.getEntityType().toString(), bufferedRequest, entity, colos);
if (!embeddedMode) {
super.submit(bufferedRequest, entity.getEntityType().toString(), currentColo);
}
- if (entity.getEntityType().equals(EntityType.FEED)){
+ if (entity.getEntityType().equals(EntityType.FEED)) {
feedNames.add(entity.getName());
- }else{
+ } else {
processNames.add(entity.getName());
}
}
@@ -471,24 +457,49 @@
metaStore.storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes);
}
- private void initializeFor(String colo) throws FalconException {
- entityManagerChannels.put(colo, ChannelFactory.get("SchedulableEntityManager", colo));
- configSyncChannels.put(colo, ChannelFactory.get("ConfigSyncService", colo));
- }
+ private void updateEntities(String extensionName, String jobName,
+ SortedMap<EntityType, List<Entity>> entityMap, InputStream configStream,
+ HttpServletRequest request) throws FalconException, IOException, JAXBException {
+ List<Entity> feeds = entityMap.get(EntityType.FEED);
+ List<Entity> processes = entityMap.get(EntityType.PROCESS);
+ validateFeeds(feeds);
+ validateProcesses(processes);
+ List<String> feedNames = new ArrayList<>();
+ List<String> processNames = new ArrayList<>();
- private Channel getConfigSyncChannel(String colo) throws FalconException {
- if (!configSyncChannels.containsKey(colo)) {
- initializeFor(colo);
+ for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+ for (final Entity entity : entry.getValue()) {
+ final String entityType = entity.getEntityType().toString();
+ final String entityName = entity.getName();
+ final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request);
+ entityProxyUtil.proxyUpdate(entityType, entityName, Boolean.FALSE, bufferedRequest, entity);
+ if (!embeddedMode) {
+ super.update(bufferedRequest, entity.getEntityType().toString(), entity.getName(), currentColo,
+ Boolean.FALSE);
+ }
+ if (entity.getEntityType().equals(EntityType.FEED)) {
+ feedNames.add(entity.getName());
+ } else {
+ processNames.add(entity.getName());
+ }
+ }
}
- return configSyncChannels.get(colo);
+
+ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+ byte[] configBytes = null;
+ if (configStream != null) {
+ configBytes = IOUtils.toByteArray(configStream);
+ }
+ metaStore.updateExtensionJob(jobName, extensionName, feedNames, processNames, configBytes);
}
- private HttpServletRequest getEntityStream(Entity entity, EntityType type, HttpServletRequest request) throws IOException, JAXBException {
+ private HttpServletRequest getEntityStream(Entity entity, EntityType type, HttpServletRequest request)
+ throws IOException, JAXBException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
type.getMarshaller().marshal(entity, baos);
final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(baos.toByteArray());
- ServletInputStream servletInputStream=new ServletInputStream(){
+ ServletInputStream servletInputStream = new ServletInputStream() {
public int read() throws IOException {
return byteArrayInputStream.read();
}
@@ -506,7 +517,7 @@
private void validateProcesses(List<Entity> processes) throws FalconException {
ProcessEntityParser processEntityParser = new ProcessEntityParser();
for (Entity process : processes) {
- processEntityParser.validate((Process)process, false);
+ processEntityParser.validate((Process) process, false);
}
}
@@ -531,21 +542,25 @@
}
@POST
- @Path("update/{extension-name}")
- @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+ @Path("update/{job-name}")
+ @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA})
@Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
public APIResult update(
- @PathParam("extension-name") String extensionName,
+ @PathParam("job-name") String jobName,
@Context HttpServletRequest request,
- @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+ @DefaultValue("") @QueryParam("doAs") String doAsUser,
+ @FormDataParam("processes") List<FormDataBodyPart> processForms,
+ @FormDataParam("feeds") List<FormDataBodyPart> feedForms,
+ @FormDataParam("config") InputStream config) {
checkIfExtensionServiceIsEnabled();
+
+ SortedMap<EntityType, List<Entity>> entityMap;
+ String extensionName = getExtensionName(jobName);
try {
- List<Entity> entities = generateEntities(extensionName, request.getInputStream());
- for (Entity entity : entities) {
- super.update(entity, entity.getEntityType().name(), entity.getName(), null);
- }
- } catch (FalconException | IOException e) {
- LOG.error("Error when updating extension job: ", e);
+ entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config);
+ updateEntities(extensionName, jobName, entityMap, config, request);
+ } catch (FalconException | IOException | JAXBException e) {
+ LOG.error("Error while updating extension job: " + jobName, e);
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
return new APIResult(APIResult.Status.SUCCEEDED, "Updated successfully");
@@ -634,7 +649,7 @@
@Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Produces({MediaType.TEXT_PLAIN, MediaType.TEXT_XML})
public APIResult deleteExtensionMetadata(
- @PathParam("extension-name") String extensionName){
+ @PathParam("extension-name") String extensionName) {
checkIfExtensionServiceIsEnabled();
try {
return super.deleteExtensionMetadata(extensionName);
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index ed1054c..74a1acc 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -20,7 +20,6 @@
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
-import org.apache.falcon.FalconRuntimException;
import org.apache.falcon.FalconWebException;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
@@ -32,14 +31,12 @@
import org.apache.falcon.monitors.Dimension;
import org.apache.falcon.monitors.Monitored;
import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.AbstractExtensionManager;
import org.apache.falcon.resource.AbstractSchedulableEntityManager;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.EntitySummaryResult;
import org.apache.falcon.resource.FeedLookupResult;
import org.apache.falcon.resource.SchedulableEntityInstanceResult;
-import org.apache.falcon.resource.channel.Channel;
-import org.apache.falcon.resource.channel.ChannelFactory;
-import org.apache.falcon.resource.AbstractExtensionManager;
import org.apache.falcon.util.DeploymentUtil;
import javax.servlet.http.HttpServletRequest;
@@ -64,47 +61,13 @@
*/
@Path("entities")
public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityManager {
- private static final String PRISM_TAG = "prism";
- public static final String FALCON_TAG = "falcon";
+ static final String PRISM_TAG = "prism";
+ static final String FALCON_TAG = "falcon";
- private final Map<String, Channel> entityManagerChannels = new HashMap<String, Channel>();
- private final Map<String, Channel> configSyncChannels = new HashMap<String, Channel>();
+ private EntityProxyUtil entityProxyUtil = new EntityProxyUtil();
private boolean embeddedMode = DeploymentUtil.isEmbeddedMode();
private String currentColo = DeploymentUtil.getCurrentColo();
- public SchedulableEntityManagerProxy() {
- try {
- Set<String> colos = getAllColos();
-
- for (String colo : colos) {
- initializeFor(colo);
- }
-
- DeploymentUtil.setPrismMode();
- } catch (FalconException e) {
- throw new FalconRuntimException("Unable to initialize channels", e);
- }
- }
-
- private void initializeFor(String colo) throws FalconException {
- entityManagerChannels.put(colo, ChannelFactory.get("SchedulableEntityManager", colo));
- configSyncChannels.put(colo, ChannelFactory.get("ConfigSyncService", colo));
- }
-
- private Channel getConfigSyncChannel(String colo) throws FalconException {
- if (!configSyncChannels.containsKey(colo)) {
- initializeFor(colo);
- }
- return configSyncChannels.get(colo);
- }
-
- private Channel getEntityManager(String colo) throws FalconException {
- if (!entityManagerChannels.containsKey(colo)) {
- initializeFor(colo);
- }
- return entityManagerChannels.get(colo);
- }
-
private BufferedRequest getBufferedRequest(HttpServletRequest request) {
if (request instanceof BufferedRequest) {
return (BufferedRequest) request;
@@ -136,8 +99,8 @@
@Override
protected SchedulableEntityInstanceResult doExecute(String colo) throws FalconException {
- return getEntityManager(colo).invoke("getEntitySLAMissPendingAlerts", entityType, entityName,
- start, end, colo);
+ return entityProxyUtil.getEntityManager(colo).invoke("getEntitySLAMissPendingAlerts", entityType,
+ entityName, start, end, colo);
}
}.execute();
}
@@ -162,24 +125,13 @@
final HttpServletRequest bufferedRequest = getBufferedRequest(request);
final Entity entity = getEntity(bufferedRequest, type);
- Map<String, APIResult> results = new HashMap<String, APIResult>();
+ Map<String, APIResult> results = new HashMap<>();
final Set<String> colos = getApplicableColos(type, entity);
entityHasExtensionJobTag(entity);
validateEntity(entity, colos);
- results.put(FALCON_TAG, new EntityProxy(type, entity.getName()) {
- @Override
- protected Set<String> getColosToApply() {
- return colos;
- }
-
- @Override
- protected APIResult doExecute(String colo) throws FalconException {
- return getConfigSyncChannel(colo).invoke("submit", bufferedRequest, type, colo);
- }
- }.execute());
-
+ results.putAll(entityProxyUtil.proxySubmit(type, bufferedRequest, entity, colos));
if (!embeddedMode) {
results.put(PRISM_TAG, super.submit(bufferedRequest, type, currentColo));
}
@@ -240,7 +192,8 @@
@Override
protected APIResult doExecute(String colo) throws FalconException {
- return getEntityManager(colo).invoke("validate", bufferedRequest, type, skipDryRun);
+ return entityProxyUtil.getEntityManager(colo).invoke("validate", bufferedRequest, type,
+ skipDryRun);
}
}.execute();
}
@@ -287,7 +240,8 @@
@Override
protected APIResult doExecute(String colo) throws FalconException {
- return getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entityName, colo);
+ return entityProxyUtil.getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entityName,
+ colo);
}
}.execute());
@@ -326,58 +280,10 @@
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
Entity newEntity = getEntity(bufferedRequest, type);
entityHasExtensionJobTag(newEntity);
- final Set<String> oldColos = getApplicableColos(type, entityName);
- final Set<String> newColos = getApplicableColos(type, newEntity);
- final Set<String> mergedColos = new HashSet<String>();
- mergedColos.addAll(oldColos);
- mergedColos.retainAll(newColos); //Common colos where update should be called
- newColos.removeAll(oldColos); //New colos where submit should be called
- oldColos.removeAll(mergedColos); //Old colos where delete should be called
- Map<String, APIResult> results = new HashMap<String, APIResult>();
+ Map<String, APIResult> results = new HashMap<>();
boolean result = true;
- if (!oldColos.isEmpty()) {
- results.put(FALCON_TAG + "/delete", new EntityProxy(type, entityName) {
- @Override
- protected Set<String> getColosToApply() {
- return oldColos;
- }
-
- @Override
- protected APIResult doExecute(String colo) throws FalconException {
- return getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entityName, colo);
- }
- }.execute());
- }
-
- if (!mergedColos.isEmpty()) {
- results.put(FALCON_TAG + "/update", new EntityProxy(type, entityName) {
- @Override
- protected Set<String> getColosToApply() {
- return mergedColos;
- }
-
- @Override
- protected APIResult doExecute(String colo) throws FalconException {
- return getConfigSyncChannel(colo).invoke("update", bufferedRequest, type, entityName,
- colo, skipDryRun);
- }
- }.execute());
- }
-
- if (!newColos.isEmpty()) {
- results.put(FALCON_TAG + "/submit", new EntityProxy(type, entityName) {
- @Override
- protected Set<String> getColosToApply() {
- return newColos;
- }
-
- @Override
- protected APIResult doExecute(String colo) throws FalconException {
- return getConfigSyncChannel(colo).invoke("submit", bufferedRequest, type, colo);
- }
- }.execute());
- }
+ results.putAll(entityProxyUtil.proxyUpdate(type, entityName, skipDryRun, bufferedRequest, newEntity));
for (APIResult apiResult : results.values()) {
if (apiResult.getStatus() != APIResult.Status.SUCCEEDED) {
@@ -451,8 +357,8 @@
@Override
protected APIResult doExecute(String colo) throws FalconException {
- return getConfigSyncChannel(colo).invoke("updateClusterDependents", clusterName,
- colo, skipDryRun);
+ return entityProxyUtil.getConfigSyncChannel(colo).invoke("updateClusterDependents",
+ clusterName, colo, skipDryRun);
}
}.execute());
}
@@ -497,7 +403,7 @@
@Override
protected APIResult doExecute(String colo) throws FalconException {
- return getEntityManager(colo).invoke("touch", type, entityName, colo, skipDryRun);
+ return entityProxyUtil.getEntityManager(colo).invoke("touch", type, entityName, colo, skipDryRun);
}
}.execute();
}
@@ -527,7 +433,8 @@
@Override
protected APIResult doExecute(String colo) throws FalconException {
- return getEntityManager(colo).invoke("getStatus", type, entity, colo, showScheduler);
+ return entityProxyUtil.getEntityManager(colo).invoke("getStatus", type, entity, colo,
+ showScheduler);
}
}.execute();
}
@@ -592,8 +499,8 @@
@Override
protected APIResult doExecute(String colo) throws FalconException {
- return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity, colo, skipDryRun,
- properties);
+ return entityProxyUtil.getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity,
+ colo, skipDryRun, properties);
}
}.execute();
}
@@ -654,7 +561,8 @@
@Override
protected APIResult doExecute(String colo) throws FalconException {
- return getEntityManager(colo).invoke("suspend", bufferedRequest, type, entity, colo);
+ return entityProxyUtil.getEntityManager(colo).invoke("suspend", bufferedRequest, type, entity,
+ colo);
}
}.execute();
}
@@ -686,7 +594,8 @@
@Override
protected APIResult doExecute(String colo) throws FalconException {
- return getEntityManager(colo).invoke("resume", bufferedRequest, type, entity, colo);
+ return entityProxyUtil.getEntityManager(colo).invoke("resume", bufferedRequest, type, entity,
+ colo);
}
}.execute();
}
@@ -811,9 +720,9 @@
@Override
protected EntitySummaryResult doExecute(String colo) throws FalconException {
- EntitySummaryResult es = getEntityManager(colo).invoke("getEntitySummary", type, clusterName, startStr,
- endStr, entityFields, entityFilter, entityTags, entityOrderBy, entitySortOrder, entityOffset,
- numEntities, numInstanceResults, doAsUser);
+ EntitySummaryResult es = entityProxyUtil.getEntityManager(colo).invoke("getEntitySummary", type,
+ clusterName, startStr, endStr, entityFields, entityFilter, entityTags, entityOrderBy,
+ entitySortOrder, entityOffset, numEntities, numInstanceResults, doAsUser);
return es;
}
}.execute();
@@ -844,7 +753,7 @@
@Override
protected FeedLookupResult doExecute(String colo) throws FalconException {
- return getEntityManager(colo).invoke("reverseLookup", type, path);
+ return entityProxyUtil.getEntityManager(colo).invoke("reverseLookup", type, path);
}
}.execute();
diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
index 7688619..23f4cf1 100644
--- a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
+++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
@@ -146,12 +146,14 @@
return;
}
Process newProcess = (Process) newEntity;
+ Process oldProcess = EntityUtil.getEntity(oldEntity.getEntityType(), oldEntity.getName());
if (newProcess.getSla() == null || newProcess.getSla().getShouldEndIn() == null){
- backlogMetricStore.deleteEntityInstance(newProcess.getName());
- entityBacklogs.remove(newProcess);
- Process process = EntityUtil.getEntity(oldEntity.getEntityType(), oldEntity.getName());
- for(Cluster cluster : process.getClusters().getClusters()){
- dropMetric(cluster.getName(), process);
+ if (oldProcess.getSla() != null) {
+ backlogMetricStore.deleteEntityInstance(newProcess.getName());
+ entityBacklogs.remove(newProcess);
+ for (Cluster cluster : oldProcess.getClusters().getClusters()) {
+ dropMetric(cluster.getName(), oldProcess);
+ }
}
} else {
addToBacklog(newEntity);
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 3a3c5b2..66b8e9b 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -293,7 +293,8 @@
}
}
- private SortedMap<EntityType, List<Entity>> getEntityTypeListMap(String extensionName, String jobName, InputStream configStream) {
+ private SortedMap<EntityType, List<Entity>> getEntityTypeListMap(String extensionName, String jobName,
+ InputStream configStream) {
List<Entity> entities = getEntities(extensionName, jobName, configStream);
List<Entity> feeds = new ArrayList<>();
List<Entity> processes = new ArrayList<>();
@@ -335,6 +336,18 @@
}
}
+ public APIResult updateExtensionJob(String jobName, String configPath, String doAsUser) {
+ InputStream configStream = getServletInputStream(configPath);
+ try {
+ String extensionName = ExtensionStore.getMetaStore().getExtensionJobDetails(jobName).getExtensionName();
+ SortedMap<EntityType, List<Entity>> entityMap = getEntityTypeListMap(extensionName, jobName, configStream);
+ return localExtensionManager.updateExtensionJob(extensionName, jobName, configStream,
+ entityMap);
+ } catch (FalconException | IOException e) {
+ throw new FalconCLIException("Failed in updating the extension job " + jobName);
+ }
+ }
+
@Override
public APIResult getExtensionJobDetails(final String jobName) {
return localExtensionManager.getExtensionJobDetails(jobName);
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index 4cf3ae4..7002dc8 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -86,6 +86,32 @@
return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName);
}
+ public APIResult updateExtensionJob(String extensionName, String jobName, InputStream configStream,
+ SortedMap<EntityType, List<Entity>> entityMap)
+ throws FalconException, IOException {
+ List<String> feedNames = new ArrayList<>();
+ List<String> processNames = new ArrayList<>();
+ for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+ for (Entity entity : entry.getValue()) {
+ update(entity, entity.getEntityType().toString(), entity.getName(), true);
+ }
+ }
+ byte[] configBytes = null;
+ if (configStream != null) {
+ configBytes = IOUtils.toByteArray(configStream);
+ }
+ for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+ for (final Entity entity : entry.getValue()) {
+ if (entity.getEntityType().equals(EntityType.FEED)) {
+ feedNames.add(entity.getName());
+ } else {
+ processNames.add(entity.getName());
+ }
+ }
+ }
+ ExtensionStore.getMetaStore().updateExtensionJob(jobName, extensionName, feedNames, processNames, configBytes);
+ return new APIResult(APIResult.Status.SUCCEEDED, "Updated successfully");
+ }
public APIResult registerExtensionMetadata(String extensionName, String packagePath , String description) {
return super.registerExtensionMetadata(extensionName, packagePath, description, CurrentUser.getUser());
@@ -100,7 +126,7 @@
}
public APIResult getExtensionDetails(String extensionName){
- return super.getExtensionJobDetail(extensionName);
+ return super.getExtensionDetail(extensionName);
}
public APIResult getExtensions(){
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index 0771b9d..690fdd5 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -241,6 +241,11 @@
return falconUnitClient.submitAndScheduleExtensionJob(extensionName, jobName, configPath, doAsUser);
}
+ APIResult updateExtensionJob(String jobName, String configPath, String doAsUser) {
+ return falconUnitClient.updateExtensionJob(jobName, configPath, doAsUser);
+ }
+
+
public static String overlayParametersOverTemplate(String template,
Map<String, String> overlay) throws IOException {
File tmpFile = getTempFile();
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 293bb23..07d8195 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -70,7 +70,8 @@
private static final String WORKFLOW = "workflow.xml";
private static final String SLEEP_WORKFLOW = "sleepWorkflow.xml";
private static final String EXTENSION_PATH = "/projects/falcon/extension/testExtension";
- public static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources";
+ private static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources";
+ private static final String EXTENSION_PROPERTIES = "extension.properties";
private FileSystem fileSystem;
private static final String STORAGE_URL = "jail://global:00";
@@ -445,6 +446,20 @@
result = getExtensionJobDetails("testJob");
JSONObject resultJson = new JSONObject(result);
Assert.assertEquals(resultJson.get("extensionName"), "testExtension");
+ Process process = (Process)getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
+ Assert.assertEquals(process.getPipelines(), "testPipeline");
+
+ apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false);
+ assertStatus(apiResult);
+ Assert.assertEquals(apiResult.getMessage(), "RUNNING");
+
+ apiResult = updateExtensionJob("testJob", getAbsolutePath(EXTENSION_PROPERTIES), null);
+ assertStatus(apiResult);
+
+ String processes = new JSONObject(getExtensionJobDetails("testJob")).get("processes").toString();
+ Assert.assertEquals(processes, "sample");
+ process = (Process)getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
+ Assert.assertEquals(process.getPipelines(), "testSample");
apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false);
assertStatus(apiResult);
diff --git a/unit/src/test/resources/extension.properties b/unit/src/test/resources/extension.properties
index d52de1e..0f2d7e8 100644
--- a/unit/src/test/resources/extension.properties
+++ b/unit/src/test/resources/extension.properties
@@ -20,4 +20,4 @@
#### This is used for falcon packaging only. ####
####################################################
-pipelines.name=test
+pipelines.name=testSample