YARN-4221. Store user in app to flow table (Varun Saxena via sjlee)
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 7c40f8c..252d7b3 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -138,6 +138,8 @@
YARN-4129. Refactor the SystemMetricPublisher in RM to better support
newer events (Naganarasimha G R via sjlee)
+ YARN-4221. Store user in app to flow table (Varun Saxena via sjlee)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
index d82a402..d3ff8b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
@@ -265,11 +265,6 @@
return str == null ? null : str.trim();
}
- private static String parseUser(UserGroupInformation callerUGI, String user) {
- return (callerUGI != null && (user == null || user.isEmpty()) ?
- callerUGI.getUserName().trim() : parseStr(user));
- }
-
private static UserGroupInformation getUser(HttpServletRequest req) {
String remoteUser = req.getRemoteUser();
UserGroupInformation callerUGI = null;
@@ -389,7 +384,7 @@
Set<TimelineEntity> entities = null;
try {
entities = timelineReaderManager.getEntities(
- parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId),
+ parseStr(userId), parseStr(clusterId), parseStr(flowId),
parseLongStr(flowRunId), parseStr(appId), parseStr(entityType),
parseLongStr(limit), parseLongStr(createdTimeStart),
parseLongStr(createdTimeEnd), parseLongStr(modifiedTimeStart),
@@ -463,7 +458,7 @@
TimelineEntity entity = null;
try {
entity = timelineReaderManager.getEntity(
- parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId),
+ parseStr(userId), parseStr(clusterId), parseStr(flowId),
parseLongStr(flowRunId), parseStr(appId), parseStr(entityType),
parseStr(entityId), parseFieldsStr(fields, COMMA_DELIMITER));
} catch (Exception e) {
@@ -482,35 +477,35 @@
}
/**
- * Return a single flow run for the given cluster, flow id and run id.
+ * Return a single flow run for the given user, flow id and run id.
* Cluster ID is not provided by client so default cluster ID has to be taken.
*/
@GET
- @Path("/flowrun/{flowid}/{flowrunid}/")
+ @Path("/flowrun/{userid}/{flowid}/{flowrunid}/")
@Produces(MediaType.APPLICATION_JSON)
public TimelineEntity getFlowRun(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
+ @PathParam("userid") String userId,
@PathParam("flowid") String flowId,
@PathParam("flowrunid") String flowRunId,
- @QueryParam("userid") String userId,
@QueryParam("fields") String fields) {
- return getFlowRun(req, res, null, flowId, flowRunId, userId, fields);
+ return getFlowRun(req, res, userId, null, flowId, flowRunId, fields);
}
/**
- * Return a single flow run for the given cluster, flow id and run id.
+ * Return a single flow run for the given user, cluster, flow id and run id.
*/
@GET
- @Path("/flowrun/{clusterid}/{flowid}/{flowrunid}/")
+ @Path("/flowrun/{userid}/{clusterid}/{flowid}/{flowrunid}/")
@Produces(MediaType.APPLICATION_JSON)
public TimelineEntity getFlowRun(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
+ @PathParam("userid") String userId,
@PathParam("clusterid") String clusterId,
@PathParam("flowid") String flowId,
@PathParam("flowrunid") String flowRunId,
- @QueryParam("userid") String userId,
@QueryParam("fields") String fields) {
String url = req.getRequestURI() +
(req.getQueryString() == null ? "" :
@@ -522,9 +517,8 @@
TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
TimelineEntity entity = null;
try {
- entity = timelineReaderManager.getEntity(
- parseUser(callerUGI, userId), parseStr(clusterId),
- parseStr(flowId), parseLongStr(flowRunId), null,
+ entity = timelineReaderManager.getEntity(parseStr(userId),
+ parseStr(clusterId), parseStr(flowId), parseLongStr(flowRunId), null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null,
parseFieldsStr(fields, COMMA_DELIMITER));
} catch (Exception e) {
@@ -543,37 +537,37 @@
}
/**
- * Return a set of flows runs for the given flow id.
+ * Return a set of flows runs for the given user and flow id.
* Cluster ID is not provided by client so default cluster ID has to be taken.
*/
@GET
- @Path("/flowruns/{flowid}/")
+ @Path("/flowruns/{userid}/{flowid}/")
@Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getFlowRuns(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
+ @PathParam("userid") String userId,
@PathParam("flowid") String flowId,
- @QueryParam("userid") String userId,
@QueryParam("limit") String limit,
@QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd,
@QueryParam("fields") String fields) {
- return getFlowRuns(req, res, null, flowId, userId, limit, createdTimeStart,
+ return getFlowRuns(req, res, userId, null, flowId, limit, createdTimeStart,
createdTimeEnd, fields);
}
/**
- * Return a set of flow runs for the given cluster and flow id.
+ * Return a set of flow runs for the given user, cluster and flow id.
*/
@GET
- @Path("/flowruns/{clusterid}/{flowid}/")
+ @Path("/flowruns/{userid}/{clusterid}/{flowid}/")
@Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getFlowRuns(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
+ @PathParam("userid") String userId,
@PathParam("clusterid") String clusterId,
@PathParam("flowid") String flowId,
- @QueryParam("userid") String userId,
@QueryParam("limit") String limit,
@QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd,
@@ -589,11 +583,11 @@
Set<TimelineEntity> entities = null;
try {
entities = timelineReaderManager.getEntities(
- parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId),
- null, null, TimelineEntityType.YARN_FLOW_RUN.toString(),
- parseLongStr(limit), parseLongStr(createdTimeStart),
- parseLongStr(createdTimeEnd), null, null, null, null, null, null,
- null, null, parseFieldsStr(fields, COMMA_DELIMITER));
+ parseStr(userId), parseStr(clusterId), parseStr(flowId), null, null,
+ TimelineEntityType.YARN_FLOW_RUN.toString(), parseLongStr(limit),
+ parseLongStr(createdTimeStart), parseLongStr(createdTimeEnd), null,
+ null, null, null, null, null, null, null,
+ parseFieldsStr(fields, COMMA_DELIMITER));
} catch (Exception e) {
handleException(e, url, startTime, "createdTime start/end or limit");
}
@@ -730,10 +724,9 @@
TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
TimelineEntity entity = null;
try {
- entity = timelineReaderManager.getEntity(
- parseUser(callerUGI, userId), parseStr(clusterId),
- parseStr(flowId), parseLongStr(flowRunId), parseStr(appId),
- TimelineEntityType.YARN_APPLICATION.toString(), null,
+ entity = timelineReaderManager.getEntity(parseStr(userId),
+ parseStr(clusterId), parseStr(flowId), parseLongStr(flowRunId),
+ parseStr(appId), TimelineEntityType.YARN_APPLICATION.toString(), null,
parseFieldsStr(fields, COMMA_DELIMITER));
} catch (Exception e) {
handleException(e, url, startTime, "flowrunid");
@@ -750,20 +743,20 @@
}
/**
- * Return a list of apps for given flow id and flow run id. Cluster ID is not
- * provided by client so default cluster ID has to be taken. If number of
- * matching apps are more than the limit, most recent apps till the limit is
- * reached, will be returned.
+ * Return a list of apps for given user, flow id and flow run id. Cluster ID
+ * is not provided by client so default cluster ID has to be taken. If number
+ * of matching apps are more than the limit, most recent apps till the limit
+ * is reached, will be returned.
*/
@GET
- @Path("/flowrunapps/{flowid}/{flowrunid}/")
+ @Path("/flowrunapps/{userid}/{flowid}/{flowrunid}/")
@Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getFlowRunApps(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
+ @PathParam("userid") String userId,
@PathParam("flowid") String flowId,
@PathParam("flowrunid") String flowRunId,
- @QueryParam("userid") String userId,
@QueryParam("limit") String limit,
@QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd,
@@ -784,20 +777,20 @@
}
/**
- * Return a list of apps for a given cluster id, flow id and flow run id. If
- * number of matching apps are more than the limit, most recent apps till the
- * limit is reached, will be returned.
+ * Return a list of apps for a given user, cluster id, flow id and flow run
+ * id. If number of matching apps are more than the limit, most recent apps
+ * till the limit is reached, will be returned.
*/
@GET
- @Path("/flowrunapps/{clusterid}/{flowid}/{flowrunid}/")
+ @Path("/flowrunapps/{userid}/{clusterid}/{flowid}/{flowrunid}/")
@Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getFlowRunApps(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
+ @PathParam("userid") String userId,
@PathParam("clusterid") String clusterId,
@PathParam("flowid") String flowId,
@PathParam("flowrunid") String flowRunId,
- @QueryParam("userid") String userId,
@QueryParam("limit") String limit,
@QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd,
@@ -818,19 +811,19 @@
}
/**
- * Return a list of apps for given flow id. Cluster ID is not provided by
- * client so default cluster ID has to be taken. If number of matching apps
- * are more than the limit, most recent apps till the limit is reached, will
- * be returned.
+ * Return a list of apps for given user and flow id. Cluster ID is not
+ * provided by client so default cluster ID has to be taken. If number of
+ * matching apps are more than the limit, most recent apps till the limit is
+ * reached, will be returned.
*/
@GET
- @Path("/flowapps/{flowid}/")
+ @Path("/flowapps/{userid}/{flowid}/")
@Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getFlowApps(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
+ @PathParam("userid") String userId,
@PathParam("flowid") String flowId,
- @QueryParam("userid") String userId,
@QueryParam("limit") String limit,
@QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd,
@@ -851,19 +844,19 @@
}
/**
- * Return a list of apps for a given cluster id and flow id. If number of
- * matching apps are more than the limit, most recent apps till the limit is
- * reached, will be returned.
+ * Return a list of apps for a given user, cluster id and flow id. If number
+ * of matching apps are more than the limit, most recent apps till the limit
+ * is reached, will be returned.
*/
@GET
- @Path("/flowapps/{clusterid}/{flowid}/")
+ @Path("/flowapps/{userid}/{clusterid}/{flowid}/")
@Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getFlowApps(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
+ @PathParam("userid") String userId,
@PathParam("clusterid") String clusterId,
@PathParam("flowid") String flowId,
- @QueryParam("userid") String userId,
@QueryParam("limit") String limit,
@QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
index 6d1a2ff..8324afd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
@@ -90,12 +90,12 @@
@Override
protected void validateParams() {
- Preconditions.checkNotNull(userId, "userId shouldn't be null");
Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
if (singleEntityRead) {
Preconditions.checkNotNull(appId, "appId shouldn't be null");
} else {
+ Preconditions.checkNotNull(userId, "userId shouldn't be null");
Preconditions.checkNotNull(flowId, "flowId shouldn't be null");
}
}
@@ -104,11 +104,12 @@
protected void augmentParams(Configuration hbaseConf, Connection conn)
throws IOException {
if (singleEntityRead) {
- if (flowId == null || flowRunId == null) {
+ if (flowId == null || flowRunId == null || userId == null) {
FlowContext context =
lookupFlowContext(clusterId, appId, hbaseConf, conn);
flowId = context.flowId;
flowRunId = context.flowRunId;
+ userId = context.userId;
}
}
if (fieldsToRetrieve == null) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
index bbca209..04fc8ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
@@ -100,6 +100,7 @@
Result result = appToFlowTable.getResult(hbaseConf, conn, get);
if (result != null && !result.isEmpty()) {
return new FlowContext(
+ AppToFlowColumn.USER_ID.readResult(result).toString(),
AppToFlowColumn.FLOW_ID.readResult(result).toString(),
((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
} else {
@@ -110,9 +111,11 @@
}
protected static class FlowContext {
+ protected final String userId;
protected final String flowId;
protected final Long flowRunId;
- public FlowContext(String flowId, Long flowRunId) {
+ public FlowContext(String user, String flowId, Long flowRunId) {
+ this.userId = user;
this.flowId = flowId;
this.flowRunId = flowRunId;
}
@@ -120,7 +123,6 @@
@Override
protected void validateParams() {
- Preconditions.checkNotNull(userId, "userId shouldn't be null");
Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
Preconditions.checkNotNull(appId, "appId shouldn't be null");
Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
@@ -132,12 +134,13 @@
@Override
protected void augmentParams(Configuration hbaseConf, Connection conn)
throws IOException {
- // In reality both should be null or neither should be null
- if (flowId == null || flowRunId == null) {
+ // In reality all three should be null or neither should be null
+ if (flowId == null || flowRunId == null || userId == null) {
FlowContext context =
lookupFlowContext(clusterId, appId, hbaseConf, conn);
flowId = context.flowId;
flowRunId = context.flowRunId;
+ userId = context.userId;
}
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.noneOf(Field.class);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 3649865..a57be55 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -162,8 +162,7 @@
String flowName, String flowVersion, long flowRunId, String appId,
TimelineEntity te) throws IOException {
// store in App to flow table
- storeInAppToFlowTable(clusterId, userId, flowName, flowVersion, flowRunId,
- appId, te);
+ storeInAppToFlowTable(clusterId, userId, flowName, flowRunId, appId, te);
// store in flow run table
storeAppCreatedInFlowRunTable(clusterId, userId, flowName, flowVersion,
flowRunId, appId, te);
@@ -200,11 +199,12 @@
}
private void storeInAppToFlowTable(String clusterId, String userId,
- String flowName, String flowVersion, long flowRunId, String appId,
- TimelineEntity te) throws IOException {
+ String flowName, long flowRunId, String appId, TimelineEntity te)
+ throws IOException {
byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId);
+ AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId);
}
/*
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
index 859fdca..7f1ecaf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
@@ -28,7 +28,6 @@
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
import java.io.IOException;
-import java.util.Map;
/**
* Identifies fully qualified columns for the {@link AppToFlowTable}.
@@ -43,7 +42,12 @@
/**
* The flow run ID
*/
- FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id");
+ FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id"),
+
+ /**
+ * The user
+ */
+ USER_ID(AppToFlowColumnFamily.MAPPING, "user_id");
private final ColumnHelper<AppToFlowTable> column;
private final ColumnFamily<AppToFlowTable> columnFamily;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
index 2467856..b30f253 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
@@ -49,6 +49,9 @@
* | | flowRunId: |
* | | 1452828720457 |
* | | |
+ * | | user_id: |
+ * | | admin |
+ * | | |
* | | |
* | | |
* |--------------------------------------|
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
index 4f53fe2..3b285aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
@@ -199,6 +199,18 @@
entity4.addEvent(event4);
te4.addEntity(entity4);
+ TimelineEntities te5 = new TimelineEntities();
+ TimelineEntity entity5 = new TimelineEntity();
+ entity5.setId("entity1");
+ entity5.setType("type1");
+ entity5.setCreatedTime(1425016501034L);
+ te5.addEntity(entity5);
+ TimelineEntity entity6 = new TimelineEntity();
+ entity6.setId("entity2");
+ entity6.setType("type1");
+ entity6.setCreatedTime(1425016501034L);
+ te5.addEntity(entity6);
+
HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration();
try {
@@ -209,6 +221,8 @@
hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4);
hbi.write(cluster, user, flow2,
flowVersion2, runid2, entity3.getId(), te3);
+ hbi.write(cluster, user, flow, flowVersion, runid,
+ "application_1111111111_1111", te5);
hbi.flush();
} finally {
hbi.close();
@@ -333,7 +347,7 @@
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowrun/cluster1/flow_name/1002345678919?userid=user1");
+ "timeline/flowrun/user1/cluster1/flow_name/1002345678919");
ClientResponse resp = getResponse(client, uri);
FlowRunEntity entity = resp.getEntity(FlowRunEntity.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@@ -350,7 +364,7 @@
// Query without specifying cluster ID.
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowrun/flow_name/1002345678919?userid=user1");
+ "timeline/flowrun/user1/flow_name/1002345678919");
resp = getResponse(client, uri);
entity = resp.getEntity(FlowRunEntity.class);
assertNotNull(entity);
@@ -374,7 +388,7 @@
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowruns/cluster1/flow_name?userid=user1");
+ "timeline/flowruns/user1/cluster1/flow_name");
ClientResponse resp = getResponse(client, uri);
Set<FlowRunEntity> entities =
resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
@@ -393,7 +407,7 @@
}
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowruns/cluster1/flow_name?userid=user1&limit=1");
+ "timeline/flowruns/user1/cluster1/flow_name?limit=1");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@@ -408,7 +422,7 @@
}
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowruns/cluster1/flow_name?userid=user1&" +
+ "timeline/flowruns/user1/cluster1/flow_name?" +
"createdtimestart=1425016501030");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
@@ -424,7 +438,7 @@
}
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowruns/cluster1/flow_name?userid=user1&" +
+ "timeline/flowruns/user1/cluster1/flow_name?" +
"createdtimestart=1425016500999&createdtimeend=1425016501035");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
@@ -443,7 +457,7 @@
}
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowruns/cluster1/flow_name?userid=user1&" +
+ "timeline/flowruns/user1/cluster1/flow_name?" +
"createdtimeend=1425016501030");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
@@ -459,7 +473,7 @@
}
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowruns/cluster1/flow_name?userid=user1&fields=metrics");
+ "timeline/flowruns/user1/cluster1/flow_name?fields=metrics");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@@ -620,7 +634,7 @@
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/app/cluster1/application_1111111111_1111?" +
- "userid=user1&fields=ALL");
+ "fields=ALL");
ClientResponse resp = getResponse(client, uri);
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
assertNotNull(entity);
@@ -641,12 +655,48 @@
}
@Test
+ public void testGetEntityWithoutFlowInfo() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/entity/cluster1/application_1111111111_1111/type1/entity1");
+ ClientResponse resp = getResponse(client, uri);
+ TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+ assertNotNull(entity);
+ assertEquals("entity1", entity.getId());
+ assertEquals("type1", entity.getType());
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetEntitiesWithoutFlowInfo() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/entities/cluster1/application_1111111111_1111/type1");
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity1") ||
+ entity.getId().equals("entity2"));
+ }
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
public void testGetFlowRunApps() throws Exception {
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowrunapps/cluster1/flow_name/1002345678919?" +
- "userid=user1&fields=ALL");
+ "timeline/flowrunapps/user1/cluster1/flow_name/1002345678919?" +
+ "fields=ALL");
ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities =
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@@ -662,14 +712,14 @@
// Query without specifying cluster ID.
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowrunapps/flow_name/1002345678919?userid=user1");
+ "timeline/flowrunapps/user1/flow_name/1002345678919");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowrunapps/flow_name/1002345678919?userid=user1&limit=1");
+ "timeline/flowrunapps/user1/flow_name/1002345678919?limit=1");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
@@ -684,7 +734,7 @@
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowapps/cluster1/flow_name?userid=user1&fields=ALL");
+ "timeline/flowapps/user1/cluster1/flow_name?fields=ALL");
ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities =
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@@ -702,14 +752,14 @@
// Query without specifying cluster ID.
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowapps/flow_name?userid=user1");
+ "timeline/flowapps/user1/flow_name");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(3, entities.size());
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowapps/flow_name?userid=user1&limit=1");
+ "timeline/flowapps/user1/flow_name?limit=1");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
@@ -725,7 +775,7 @@
try {
String entityType = TimelineEntityType.YARN_APPLICATION.toString();
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowapps/cluster1/flow_name?userid=user1&eventfilters=" +
+ "timeline/flowapps/user1/cluster1/flow_name?eventfilters=" +
ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities =
@@ -736,7 +786,7 @@
newEntity(entityType, "application_1111111111_1111")));
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowapps/cluster1/flow_name?userid=user1&metricfilters=" +
+ "timeline/flowapps/user1/cluster1/flow_name?metricfilters=" +
"HDFS_BYTES_READ");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@@ -746,7 +796,7 @@
newEntity(entityType, "application_1111111111_1111")));
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowapps/cluster1/flow_name?userid=user1&conffilters=" +
+ "timeline/flowapps/user1/cluster1/flow_name?conffilters=" +
"cfg1:value1");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@@ -764,7 +814,7 @@
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowrun/cluster1/flow_name/1002345678929?userid=user1");
+ "timeline/flowrun/user1/cluster1/flow_name/1002345678929");
verifyHttpResponse(client, uri, Status.NOT_FOUND);
} finally {
client.destroy();
@@ -793,8 +843,8 @@
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/app/cluster1/flow_name/1002345678919/" +
- "application_1111111111_1378?userid=user1");
+ "timeline/app/user1/cluster1/flow_name/1002345678919/" +
+ "application_1111111111_1378");
verifyHttpResponse(client, uri, Status.NOT_FOUND);
} finally {
client.destroy();
@@ -806,7 +856,7 @@
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowrunapps/cluster2/flow_name/1002345678919");
+ "timeline/flowrunapps/user1/cluster2/flow_name/1002345678919");
ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities =
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@@ -823,7 +873,7 @@
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowapps/cluster2/flow_name55");
+ "timeline/flowapps/user1/cluster2/flow_name55");
ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities =
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});