FALCON-2246 Api to list all the jobs in the system
Author: Pracheer Agarwal <pracheer.agarwal@inmobi.com>
Author: Pracheer Agarwal <pracheeragarwal@gmail.com>
Author: Pracheer Agarwal <pr@im2216-x0.corp.inmobi.com>
Author: sandeep <sandysmdl@gmail.com>
Reviewers: @sandeepSamudrala,@pallavi-rao
Closes #344 from PracheerAgarwal/FALCON-2246 and squashes the following commits:
3b7377f [Pracheer Agarwal] changes for review comments
001bcf6 [Pracheer Agarwal] changes for review comments
d9cc2c0 [sandeep] FALCON-2247 test cases added
e1d321f [Pracheer Agarwal] FALCON-2246 Api to list all the jobs in the system
c40d4a8 [Pracheer Agarwal] FALCON-2246, modification in list API to show all jobs in the system
f951450 [Pracheer Agarwal] skeleton of list api changes
d16eaf1 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2246
0915834 [Pracheer Agarwal] reverting the old changes
b31c758 [Pracheer Agarwal] back merge
abd9c71 [Pracheer Agarwal] FALCON-2246 Api to list all the jobs in the system
778c579 [Pracheer Agarwal] Merge branch 'master' of https://github.com/PracheerAgarwal/falcon
e39808d [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
a932633 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
fda3b28 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
a93d71a [Pracheer Agarwal] Merge branch 'master' of https://github.com/PracheerAgarwal/falcon
e3728d5 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
066c8e2 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
b20f044 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
7f572a1 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
46042fd [Pracheer Agarwal] Merge branch 'master' of https://github.com/PracheerAgarwal/falcon
daa3ffc [Pracheer Agarwal] FALCON-2225 extension owner added for trusted extensions
622cae4 [Pracheer Agarwal] FALCON-2225 extension owner added for trusted extensions
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
index 15eb8d5..5d44128 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
@@ -150,8 +150,7 @@
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.deleteExtensionJob(jobName, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.LIST_OPT)) {
- validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
- ExtensionJobList jobs = client.listExtensionJob(extensionName, doAsUser,
+ ExtensionJobList jobs = client.getExtensionJobs(extensionName, doAsUser,
commandLine.getOptionValue(FalconCLIConstants.SORT_ORDER_OPT));
result = jobs != null ? jobs.toString() : "No extension job (" + extensionName + ") found.";
} else if (optionsList.contains(INSTANCES_OPT)) {
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index 49392c2..7ed669f 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -23,6 +23,7 @@
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.EntitySummaryResult;
+import org.apache.falcon.resource.ExtensionJobList;
import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.resource.FeedLookupResult;
import org.apache.falcon.resource.InstanceDependencyResult;
@@ -295,6 +296,12 @@
public abstract APIResult enumerateExtensions();
/**
+ * Returns all registered jobs for an extension.
+ * @return
+ */
+ public abstract ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser);
+
+ /**
*
* Get list of the entities.
* We have two filtering parameters for entity tags: "tags" and "tagkeys".
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index aca83e3..25eaeb5 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -1023,6 +1023,16 @@
return getResponse(APIResult.class, clientResponse);
}
+ @Override
+ public ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) {
+ ClientResponse clientResponse = new ResourceBuilder()
+ .path(ExtensionOperations.LIST.path, extensionName)
+ .addQueryParam(DO_AS_OPT, doAsUser)
+ .addQueryParam(SORT_ORDER, sortOrder)
+ .call(ExtensionOperations.LIST);
+ return getResponse(ExtensionJobList.class, clientResponse);
+ }
+
public APIResult unregisterExtension(final String extensionName) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.UNREGISTER.path, extensionName)
@@ -1246,16 +1256,6 @@
return getResponse(APIResult.class, clientResponse);
}
- public ExtensionJobList listExtensionJob(final String extensionName, final String doAsUser,
- final String sortOrder) {
- ClientResponse clientResponse = new ResourceBuilder()
- .path(ExtensionOperations.LIST.path, extensionName)
- .addQueryParam(DO_AS_OPT, doAsUser)
- .addQueryParam(SORT_ORDER, sortOrder)
- .call(ExtensionOperations.LIST);
- return getResponse(ExtensionJobList.class, clientResponse);
- }
-
public ExtensionInstanceList listExtensionInstance(final String jobName, final String doAsUser, final String fields,
final String start, final String end, final String status,
final String orderBy, final String sortOrder,
diff --git a/common-types/src/main/java/org/apache/falcon/resource/ExtensionJobList.java b/common-types/src/main/java/org/apache/falcon/resource/ExtensionJobList.java
index 244ea66..78fd5d4 100644
--- a/common-types/src/main/java/org/apache/falcon/resource/ExtensionJobList.java
+++ b/common-types/src/main/java/org/apache/falcon/resource/ExtensionJobList.java
@@ -21,8 +21,8 @@
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
/**
* Extension job list used for marshalling / unmarshalling with REST calls.
@@ -36,19 +36,23 @@
int numJobs;
@XmlElementWrapper(name = "jobs")
- public List<String> job;
+ public Map<String, String> job;
public ExtensionJobList() {
numJobs = 0;
job = null;
}
- public ExtensionJobList(int numJobs) {
- this.numJobs = numJobs;
- job = new ArrayList<>();
+ public int getNumJobs() {
+ return numJobs;
}
- public ExtensionJobList(int numJobs, List<String> extensionJobNames) {
+ public ExtensionJobList(int numJobs) {
+ this.numJobs = numJobs;
+ job = new HashMap<>();
+ }
+
+ public ExtensionJobList(int numJobs, Map<String, String> extensionJobNames) {
this.numJobs = numJobs;
this.job = extensionJobNames;
}
@@ -57,8 +61,8 @@
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(numJobs).append("\n");
- for (String extensionJobNames : job) {
- builder.append(extensionJobNames);
+ for (Map.Entry<String, String> extensionJobs : job.entrySet()) {
+ builder.append(extensionJobs.getKey() + "\t" + extensionJobs.getValue() + "\n");
}
return builder.toString();
}
diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java
index acb5cf4..b6ac79d 100644
--- a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java
@@ -45,7 +45,7 @@
@NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSION_JOBS, query = "select OBJECT(a) from ExtensionJobsBean a "),
@NamedQuery(name = PersistenceConstants.DELETE_EXTENSION_JOB, query = "delete from ExtensionJobsBean a where a.jobName = :jobName "),
@NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName"),
- @NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select a.jobName from ExtensionJobsBean a where a.extensionName = :extensionName")
+ @NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select OBJECT(a) from ExtensionJobsBean a where a.extensionName = :extensionName")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck
public class ExtensionJobsBean {
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
index 18c8540..81a4d2b 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
@@ -132,18 +132,18 @@
}
}
- public List<String> getJobsForAnExtension(String extensionName) {
+ public List<ExtensionJobsBean> getJobsForAnExtension(String extensionName) {
+ List<ExtensionJobsBean> extensionJobs = new ArrayList<>();
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query query = entityManager.createNamedQuery(PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION);
query.setParameter(EXTENSION_NAME, extensionName);
- List<String> jobNames = new ArrayList<>();
try {
- jobNames.addAll((List<String>) query.getResultList());
+ extensionJobs.addAll(query.getResultList());
+ return extensionJobs;
} finally {
commitAndCloseTransaction(entityManager);
}
- return jobNames;
}
public void deleteExtension(String extensionName) {
@@ -234,12 +234,14 @@
}
}
- List<ExtensionJobsBean> getAllExtensionJobs() {
+ public List<ExtensionJobsBean> getAllExtensionJobs() {
+ List<ExtensionJobsBean> extensionJobs = new ArrayList<>();
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_EXTENSION_JOBS);
try {
- return q.getResultList();
+ extensionJobs.addAll(q.getResultList());
+ return extensionJobs;
} finally {
commitAndCloseTransaction(entityManager);
}
diff --git a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java
index 4c49445..45d58e0 100644
--- a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java
+++ b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java
@@ -87,7 +87,7 @@
stateStore.storeExtensionJob("job1", "test2", feeds, processes, config);
Assert.assertEquals(stateStore.getJobsForAnExtension("test2").size(), 1);
- Assert.assertEquals(stateStore.getJobsForAnExtension("test2").get(0), "job1");
+ Assert.assertEquals(stateStore.getJobsForAnExtension("test2").get(0).getJobName(), "job1");
Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 1);
Assert.assertEquals(stateStore.getExtensionJobDetails("job1").getFeeds().get(0), "testFeed");
stateStore.deleteExtensionJob("job1");
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
index ab08e00..bb473f9 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
@@ -37,7 +37,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Response;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -55,6 +59,8 @@
private static final String CONFIG = "config";
private static final String CREATION_TIME = "creationTime";
private static final String LAST_UPDATE_TIME = "lastUpdatedTime";
+ protected static final String ASCENDING_SORT_ORDER = "asc";
+ protected static final String DESCENDING_SORT_ORDER = "desc";
public static final String NAME = "name";
public static final String STATUS = "status";
@@ -105,6 +111,39 @@
}
}
+ public ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) {
+
+ Comparator<ExtensionJobsBean> compareByJobName = new Comparator<ExtensionJobsBean>() {
+ @Override
+ public int compare(ExtensionJobsBean o1, ExtensionJobsBean o2) {
+ return o1.getJobName().compareToIgnoreCase(o2.getJobName());
+ }
+ };
+
+ Map<String, String> jobAndExtensionNames = new HashMap<>();
+ List<ExtensionJobsBean> extensionJobs = null;
+ if (extensionName != null) {
+ extensionJobs = ExtensionStore.getMetaStore().getJobsForAnExtension(extensionName);
+ } else {
+ extensionJobs = ExtensionStore.getMetaStore().getAllExtensionJobs();
+ }
+
+ sortOrder = (sortOrder == null) ? ASCENDING_SORT_ORDER : sortOrder;
+ switch (sortOrder.toLowerCase()) {
+ case DESCENDING_SORT_ORDER:
+ Collections.sort(extensionJobs, Collections.reverseOrder(compareByJobName));
+ break;
+
+ default:
+ Collections.sort(extensionJobs, compareByJobName);
+ }
+
+ for (ExtensionJobsBean job : extensionJobs) {
+ jobAndExtensionNames.put(job.getJobName(), job.getExtensionName());
+ }
+ return new ExtensionJobList(extensionJobs.size(), jobAndExtensionNames);
+ }
+
public APIResult deleteExtensionMetadata(String extensionName) {
validateExtensionName(extensionName);
ExtensionStore metaStore = ExtensionStore.get();
@@ -119,7 +158,7 @@
private void canDeleteExtension(String extensionName) throws FalconException {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
- List<String> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
+ List<ExtensionJobsBean> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
if (!extensionJobs.isEmpty()) {
LOG.error("Extension:{} cannot be unregistered as {} are instances of the extension", extensionName,
ArrayUtils.toString(extensionJobs));
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
index efb5489..b6f405e 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -62,7 +62,6 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.xml.bind.JAXBException;
-import java.util.Collections;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Arrays;
@@ -84,8 +83,6 @@
public class ExtensionManagerProxy extends AbstractExtensionManager {
public static final Logger LOG = LoggerFactory.getLogger(ExtensionManagerProxy.class);
- private static final String ASCENDING_SORT_ORDER = "asc";
- private static final String DESCENDING_SORT_ORDER = "desc";
private Extension extension = new Extension();
private static final String README = "README";
@@ -106,15 +103,7 @@
checkIfExtensionServiceIsEnabled();
checkIfExtensionExists(extensionName);
try {
- List<String> jobNames = ExtensionStore.getMetaStore().getJobsForAnExtension(extensionName);
- switch (sortOrder.toLowerCase()) {
- case DESCENDING_SORT_ORDER:
- Collections.sort(jobNames, Collections.reverseOrder(String.CASE_INSENSITIVE_ORDER));
- break;
- default:
- Collections.sort(jobNames, String.CASE_INSENSITIVE_ORDER);
- }
- return new ExtensionJobList(jobNames.size(), jobNames);
+ return super.getExtensionJobs(extensionName, sortOrder, doAsUser);
} catch (Throwable e) {
LOG.error("Failed to get extension job list of " + extensionName + ": ", e);
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 75aeba0..3150bbd 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -36,6 +36,7 @@
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.EntitySummaryResult;
+import org.apache.falcon.resource.ExtensionJobList;
import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.resource.FeedLookupResult;
import org.apache.falcon.resource.InstanceDependencyResult;
@@ -415,6 +416,11 @@
}
@Override
+ public ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) {
+ return localExtensionManager.getExtensionJobs(extensionName, sortOrder, doAsUser);
+ }
+
+ @Override
public EntityList getEntityList(String entityType, String fields, String nameSubsequence, String tagKeywords,
String filterBy, String filterTags, String orderBy, String sortOrder,
Integer offset, Integer numResults, String doAsUser) {
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index ca39ddb..addb333 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -27,6 +27,7 @@
import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.AbstractExtensionManager;
+import org.apache.falcon.resource.ExtensionJobList;
import org.apache.falcon.security.CurrentUser;
import java.io.IOException;
@@ -202,4 +203,8 @@
public APIResult getExtensions() {
return super.getExtensions();
}
+
+ public ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) {
+ return super.getExtensionJobs(extensionName, sortOrder, doAsUser);
+ }
}
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index e9367d5..2edd424 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -35,6 +35,7 @@
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.hadoop.JailedFileSystem;
import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.ExtensionJobList;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.util.DateUtil;
import org.apache.hadoop.fs.Path;
@@ -244,6 +245,10 @@
return falconUnitClient.submitExtensionJob(extensionName, jobName, configPath, doAsUser);
}
+ ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) {
+ return falconUnitClient.getExtensionJobs(extensionName, sortOrder, doAsUser);
+ }
+
public APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser) {
return falconUnitClient.submitAndScheduleExtensionJob(extensionName, jobName, configPath, doAsUser);
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 8949c41..8030f20 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -26,6 +26,7 @@
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.EntitySummaryResult;
+import org.apache.falcon.resource.ExtensionJobList;
import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.resource.InstanceDependencyResult;
import org.apache.falcon.resource.InstancesResult;
@@ -458,6 +459,10 @@
apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
assertStatus(apiResult);
+
+ ExtensionJobList extensionJobList = getExtensionJobs(TEST_EXTENSION, null, null);
+ Assert.assertEquals(extensionJobList.getNumJobs(), 1);
+
apiResult = getExtensionJobDetails(TEST_JOB);
JSONObject resultJson = new JSONObject(apiResult.getMessage());
Assert.assertEquals(resultJson.get("extensionName"), TEST_EXTENSION);