YARN-3814. REST API implementation for getting raw entities in TimelineReader (Naganarasimha G R via sjlee)
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 0041f7f..b1d2bd6 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -97,6 +97,9 @@
     YARN-4025. Deal with byte representations of Longs in writer code.
     (Sangjin Lee and Vrushali C via junping_du)
 
+    YARN-3814. REST API implementation for getting raw entities in
+    TimelineReader (Naganarasimha G R via sjlee)
+
   IMPROVEMENTS
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java
index 5573185..7fafd82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java
@@ -18,10 +18,18 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.reader;
 
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 
 @Private
 @Unstable
@@ -33,4 +41,37 @@
     super(TimelineReaderManager.class.getName());
     this.reader = timelineReader;
   }
+
+  /**
+   * Get a set of entities matching given predicates. The meaning of each
+   * argument has been documented with {@link TimelineReader#getEntities}.
+   *
+   * @see TimelineReader#getEntities
+   */
+  Set<TimelineEntity> getEntities(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String>  metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) throws IOException {
+    return reader.getEntities(userId, clusterId, flowId, flowRunId, appId,
+        entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
+        modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
+        metricFilters, eventFilters, fieldsToRetrieve);
+  }
+
+  /**
+   * Get single timeline entity. The meaning of each argument has been
+   * documented with {@link TimelineReader#getEntity}.
+   *
+   * @see TimelineReader#getEntity
+   */
+  public TimelineEntity getEntity(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      String entityId, EnumSet<Field> fields) throws IOException {
+    return reader.getEntity(userId, clusterId, flowId, flowRunId, appId,
+        entityType, entityId, fields);
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
index 874112c..319cfb0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
@@ -54,7 +54,7 @@
 public class TimelineReaderServer extends CompositeService {
   private static final Log LOG = LogFactory.getLog(TimelineReaderServer.class);
   private static final int SHUTDOWN_HOOK_PRIORITY = 30;
-  private static final String TIMELINE_READER_MANAGER_ATTR =
+  static final String TIMELINE_READER_MANAGER_ATTR =
       "timeline.reader.manager";
 
   private HttpServer2 readerWebServer;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
index 3655a72..0b5fde0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
@@ -18,42 +18,283 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.reader;
 
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.apache.hadoop.yarn.webapp.BadRequestException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
 
 import com.google.inject.Singleton;
 
-
 /** REST end point for Timeline Reader */
 @Private
 @Unstable
 @Singleton
 @Path("/ws/v2/timeline")
 public class TimelineReaderWebServices {
+  private static final Log LOG =
+      LogFactory.getLog(TimelineReaderWebServices.class);
+
+  @Context private ServletContext ctxt;
+
+  private static final String COMMA_DELIMITER = ",";
+  private static final String COLON_DELIMITER = ":";
 
   private void init(HttpServletResponse response) {
     response.setContentType(null);
   }
 
+  private static Set<String> parseValuesStr(String str, String delimiter) {
+    if (str == null || str.isEmpty()) {
+      return null;
+    }
+    Set<String> strSet = new HashSet<String>();
+    String[] strs = str.split(delimiter);
+    for (String aStr : strs) {
+      strSet.add(aStr.trim());
+    }
+    return strSet;
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> void parseKeyValues(Map<String,T> map, String str,
+      String pairsDelim, String keyValuesDelim, boolean stringValue,
+      boolean multipleValues) {
+    String[] pairs = str.split(pairsDelim);
+    for (String pair : pairs) {
+      if (pair == null || pair.trim().isEmpty()) {
+        continue;
+      }
+      String[] pairStrs = pair.split(keyValuesDelim);
+      if (pairStrs.length < 2) {
+        continue;
+      }
+      if (!stringValue) {
+        try {
+          Object value =
+              GenericObjectMapper.OBJECT_READER.readValue(pairStrs[1].trim());
+          map.put(pairStrs[0].trim(), (T) value);
+        } catch (IOException e) {
+          map.put(pairStrs[0].trim(), (T) pairStrs[1].trim());
+        }
+      } else {
+        String key = pairStrs[0].trim();
+        if (multipleValues) {
+          Set<String> values = new HashSet<String>();
+          for (int i = 1; i < pairStrs.length; i++) {
+            values.add(pairStrs[i].trim());
+          }
+          map.put(key, (T) values);
+        } else {
+          map.put(key, (T) pairStrs[1].trim());
+        }
+      }
+    }
+  }
+
+  private static Map<String, Set<String>> parseKeyStrValuesStr(String str,
+      String pairsDelim, String keyValuesDelim) {
+    if (str == null) {
+      return null;
+    }
+    Map<String, Set<String>> map = new HashMap<String, Set<String>>();
+    parseKeyValues(map, str,pairsDelim, keyValuesDelim, true, true);
+    return map;
+  }
+
+  private static Map<String, String> parseKeyStrValueStr(String str,
+      String pairsDelim, String keyValDelim) {
+    if (str == null) {
+      return null;
+    }
+    Map<String, String> map = new HashMap<String, String>();
+    parseKeyValues(map, str, pairsDelim, keyValDelim, true, false);
+    return map;
+  }
+
+  private static Map<String, Object> parseKeyStrValueObj(String str,
+      String pairsDelim, String keyValDelim) {
+    if (str == null) {
+      return null;
+    }
+    Map<String, Object> map = new HashMap<String, Object>();
+    parseKeyValues(map, str, pairsDelim, keyValDelim, false, false);
+    return map;
+  }
+
+  private static EnumSet<Field> parseFieldsStr(String str, String delimiter) {
+    if (str == null) {
+      return null;
+    }
+    String[] strs = str.split(delimiter);
+    EnumSet<Field> fieldList = EnumSet.noneOf(Field.class);
+    for (String s : strs) {
+      fieldList.add(Field.valueOf(s.trim().toUpperCase()));
+    }
+    return fieldList;
+  }
+
+  private static Long parseLongStr(String str) {
+    return str == null ? null : Long.parseLong(str.trim());
+  }
+
+  private static String parseStr(String str) {
+    return str == null ? null : str.trim();
+  }
+
+  private static UserGroupInformation getUser(HttpServletRequest req) {
+    String remoteUser = req.getRemoteUser();
+    UserGroupInformation callerUGI = null;
+    if (remoteUser != null) {
+      callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+    }
+    return callerUGI;
+  }
+
+  private TimelineReaderManager getTimelineReaderManager() {
+    return (TimelineReaderManager)
+        ctxt.getAttribute(TimelineReaderServer.TIMELINE_READER_MANAGER_ATTR);
+  }
+
   /**
    * Return the description of the timeline reader web services.
    */
   @GET
-  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  @Produces(MediaType.APPLICATION_JSON)
   public TimelineAbout about(
       @Context HttpServletRequest req,
       @Context HttpServletResponse res) {
     init(res);
     return TimelineUtils.createTimelineAbout("Timeline Reader API");
   }
+
+  /**
+   * Return a set of entities that match the given parameters.
+   */
+  @GET
+  @Path("/entities/{clusterId}/{appId}/{entityType}")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Set<TimelineEntity> getEntities(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("clusterId") String clusterId,
+      @PathParam("appId") String appId,
+      @PathParam("entityType") String entityType,
+      @QueryParam("userId") String userId,
+      @QueryParam("flowId") String flowId,
+      @QueryParam("flowRunId") String flowRunId,
+      @QueryParam("limit") String limit,
+      @QueryParam("createdTimeStart") String createdTimeStart,
+      @QueryParam("createdTimeEnd") String createdTimeEnd,
+      @QueryParam("modifiedTimeStart") String modifiedTimeStart,
+      @QueryParam("modifiedTimeEnd") String modifiedTimeEnd,
+      @QueryParam("relatesto") String relatesTo,
+      @QueryParam("isrelatedto") String isRelatedTo,
+      @QueryParam("infofilters") String infofilters,
+      @QueryParam("conffilters") String conffilters,
+      @QueryParam("metricfilters") String metricfilters,
+      @QueryParam("eventfilters") String eventfilters,
+      @QueryParam("fields") String fields) {
+    init(res);
+    TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
+    UserGroupInformation callerUGI = getUser(req);
+    try {
+      return timelineReaderManager.getEntities(
+          callerUGI != null && (userId == null || userId.isEmpty()) ?
+          callerUGI.getUserName().trim() : parseStr(userId),
+          parseStr(clusterId), parseStr(flowId),
+          parseLongStr(flowRunId), parseStr(appId), parseStr(entityType),
+          parseLongStr(limit), parseLongStr(createdTimeStart),
+          parseLongStr(createdTimeEnd), parseLongStr(modifiedTimeStart),
+          parseLongStr(modifiedTimeEnd),
+          parseKeyStrValuesStr(relatesTo, COMMA_DELIMITER, COLON_DELIMITER),
+          parseKeyStrValuesStr(isRelatedTo, COMMA_DELIMITER, COLON_DELIMITER),
+          parseKeyStrValueObj(infofilters, COMMA_DELIMITER, COLON_DELIMITER),
+          parseKeyStrValueStr(conffilters, COMMA_DELIMITER, COLON_DELIMITER),
+          parseValuesStr(metricfilters, COMMA_DELIMITER),
+          parseValuesStr(eventfilters, COMMA_DELIMITER),
+          parseFieldsStr(fields, COMMA_DELIMITER));
+    } catch (NumberFormatException e) {
+      throw new BadRequestException(
+          "createdTime or modifiedTime start/end or limit or flowId is not" +
+          " a numeric value.");
+    } catch (IllegalArgumentException e) {
+      throw new BadRequestException("Requested Invalid Field.");
+    } catch (Exception e) {
+      LOG.error("Error getting entities", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  /**
+   * Return a single entity of the given entity type and Id.
+   */
+  @GET
+  @Path("/entity/{clusterId}/{appId}/{entityType}/{entityId}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public TimelineEntity getEntity(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("clusterId") String clusterId,
+      @PathParam("appId") String appId,
+      @PathParam("entityType") String entityType,
+      @PathParam("entityId") String entityId,
+      @QueryParam("userId") String userId,
+      @QueryParam("flowId") String flowId,
+      @QueryParam("flowRunId") String flowRunId,
+      @QueryParam("fields") String fields) {
+    init(res);
+    TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
+    UserGroupInformation callerUGI = getUser(req);
+    TimelineEntity entity = null;
+    try {
+      entity = timelineReaderManager.getEntity(
+          callerUGI != null && (userId == null || userId.isEmpty()) ?
+          callerUGI.getUserName().trim() : parseStr(userId),
+          parseStr(clusterId), parseStr(flowId), parseLongStr(flowRunId),
+          parseStr(appId), parseStr(entityType), parseStr(entityId),
+          parseFieldsStr(fields, COMMA_DELIMITER));
+    } catch (NumberFormatException e) {
+      throw new BadRequestException("flowRunId is not a numeric value.");
+    } catch (IllegalArgumentException e) {
+      throw new BadRequestException("Requested Invalid Field.");
+    } catch (Exception e) {
+      LOG.error("Error getting entity", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+    if (entity == null) {
+      throw new NotFoundException("Timeline entity {id: " + parseStr(entityId) +
+          ", type: " + parseStr(entityType) + " } is not found");
+    }
+    return entity;
+  }
 }
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
index 45ddd1d..626c770 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
@@ -21,6 +21,7 @@
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.Charset;
@@ -397,6 +398,10 @@
                  new FileInputStream(entityFile), Charset.forName("UTF-8")))) {
       TimelineEntity entity = readEntityFromFile(reader);
       return createEntityToBeReturned(entity, fieldsToRetrieve);
+    } catch (FileNotFoundException e) {
+      LOG.info("Cannot find entity {id:" + entityId + " , type:" + entityType +
+          "}. Will send HTTP 404 in response.");
+      return null;
     }
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
index a9145d0..0f7c22f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
@@ -18,25 +18,37 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.reader;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URL;
+import java.util.Set;
 
 import javax.ws.rs.core.MediaType;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TestFileSystemTimelineReaderImpl;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.GenericType;
 import com.sun.jersey.api.client.config.ClientConfig;
 import com.sun.jersey.api.client.config.DefaultClientConfig;
 import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
@@ -46,12 +58,23 @@
   private int serverPort;
   private TimelineReaderServer server;
 
+  @BeforeClass
+  public static void setup() throws Exception {
+    TestFileSystemTimelineReaderImpl.setup();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TestFileSystemTimelineReaderImpl.tearDown();
+  }
+
   @Before
   public void init() throws Exception {
     try {
       Configuration config = new YarnConfiguration();
       config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, 
           "localhost:0");
+      config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
       server = new TimelineReaderServer();
       server.init(config);
       server.start();
@@ -69,6 +92,22 @@
     }
   }
 
+  private static TimelineEntity newEntity(String type, String id) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setIdentifier(new TimelineEntity.Identifier(type, id));
+    return entity;
+  }
+
+  private static void verifyHttpResponse(Client client, URI uri,
+      Status status) {
+    ClientResponse resp =
+        client.resource(uri).accept(MediaType.APPLICATION_JSON)
+        .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+    assertNotNull(resp);
+    assertTrue("Response from server should have been " + status,
+        resp.getClientResponseStatus().equals(status));
+  }
+
   private static Client createClient() {
     ClientConfig cfg = new DefaultClientConfig();
     cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
@@ -76,14 +115,19 @@
         new DummyURLConnectionFactory()), cfg);
   }
 
-  private static ClientResponse getResponse(Client client, URI uri) throws Exception {
+  private static ClientResponse getResponse(Client client, URI uri)
+      throws Exception {
     ClientResponse resp =
         client.resource(uri).accept(MediaType.APPLICATION_JSON)
         .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
     if (resp == null ||
         resp.getClientResponseStatus() != ClientResponse.Status.OK) {
-       System.out.println(resp.getClientResponseStatus());
-      throw new IOException("Incorrect response from timeline reader.");
+      String msg = new String();
+      if (resp != null) {
+        msg = resp.getClientResponseStatus().toString();
+      }
+      throw new IOException("Incorrect response from timeline reader. " +
+          "Status=" + msg);
     }
     return resp;
   }
@@ -102,8 +146,7 @@
   }
 
   @Test
-  public void testAbout()
-      throws IOException {
+  public void testAbout() throws Exception {
     URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/");
     Client client = createClient();
     try {
@@ -111,9 +154,406 @@
       TimelineAbout about = resp.getEntity(TimelineAbout.class);
       Assert.assertNotNull(about);
       Assert.assertEquals("Timeline Reader API", about.getAbout());
-    } catch (Exception re) {
-      throw new IOException(
-          "Failed to get the response from timeline reader.", re);
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetEntityDefaultView() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entity/cluster1/app1/app/id_1");
+      ClientResponse resp = getResponse(client, uri);
+      TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entity);
+      assertEquals("id_1", entity.getId());
+      assertEquals("app", entity.getType());
+      assertEquals(1425016502000L, entity.getCreatedTime());
+      assertEquals(1425016503000L, entity.getModifiedTime());
+      // Default view i.e. when no fields are specified, entity contains only
+      // entity id, entity type, created and modified time.
+      assertEquals(0, entity.getConfigs().size());
+      assertEquals(0, entity.getMetrics().size());
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetEntityWithUserAndFlowInfo() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entity/cluster1/app1/app/id_1?userId=user1&" +
+          "flowId=flow1&flowRunId=1");
+      ClientResponse resp = getResponse(client, uri);
+      TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entity);
+      assertEquals("id_1", entity.getId());
+      assertEquals("app", entity.getType());
+      assertEquals(1425016502000L, entity.getCreatedTime());
+      assertEquals(1425016503000L, entity.getModifiedTime());
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetEntityCustomFields() throws Exception {
+    Client client = createClient();
+    try {
+      // Fields are case insensitive.
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entity/cluster1/app1/app/id_1?fields=CONFIGS,Metrics,info");
+      ClientResponse resp = getResponse(client, uri);
+      TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entity);
+      assertEquals("id_1", entity.getId());
+      assertEquals("app", entity.getType());
+      assertEquals(3, entity.getConfigs().size());
+      assertEquals(3, entity.getMetrics().size());
+      assertEquals(1, entity.getInfo().size());
+      // No events will be returned as events are not part of fields.
+      assertEquals(0, entity.getEvents().size());
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetEntityAllFields() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entity/cluster1/app1/app/id_1?fields=ALL");
+      ClientResponse resp = getResponse(client, uri);
+      TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entity);
+      assertEquals("id_1", entity.getId());
+      assertEquals("app", entity.getType());
+      assertEquals(3, entity.getConfigs().size());
+      assertEquals(3, entity.getMetrics().size());
+      assertEquals(1, entity.getInfo().size());
+      assertEquals(2, entity.getEvents().size());
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetEntityNotPresent() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entity/cluster1/app1/app/id_10");
+      verifyHttpResponse(client, uri, Status.NOT_FOUND);
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetEntities() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entities/cluster1/app1/app");
+      ClientResponse resp = getResponse(client, uri);
+      Set<TimelineEntity> entities =
+          resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(4, entities.size());
+      assertTrue("Entities id_1, id_2, id_3 and id_4 should have been" +
+          " present in response",
+          entities.contains(newEntity("app", "id_1")) &&
+          entities.contains(newEntity("app", "id_2")) &&
+          entities.contains(newEntity("app", "id_3")) &&
+          entities.contains(newEntity("app", "id_4")));
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetEntitiesWithLimit() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entities/cluster1/app1/app?limit=2");
+      ClientResponse resp = getResponse(client, uri);
+      Set<TimelineEntity> entities =
+          resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(2, entities.size());
+      // Entities returned are based on most recent created time.
+      assertTrue("Entities with id_1 and id_4 should have been present " +
+          "in response based on entity created time.",
+          entities.contains(newEntity("app", "id_1")) &&
+          entities.contains(newEntity("app", "id_4")));
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
+          "entities/cluster1/app1/app?limit=3");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      // Even though 2 entities out of 4 have same created time, one entity
+      // is left out due to limit
+      assertEquals(3, entities.size());
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetEntitiesBasedOnCreatedTime() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entities/cluster1/app1/app?createdTimeStart=1425016502030&"
+          + "createdTimeEnd=1425016502060");
+      ClientResponse resp = getResponse(client, uri);
+      Set<TimelineEntity> entities =
+          resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+      assertTrue("Entity with id_4 should have been present in response.",
+          entities.contains(newEntity("app", "id_4")));
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
+          "entities/cluster1/app1/app?createdTimeEnd=1425016502010");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(3, entities.size());
+      assertFalse("Entity with id_4 should not have been present in response.",
+          entities.contains(newEntity("app", "id_4")));
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
+          "entities/cluster1/app1/app?createdTimeStart=1425016502010");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+      assertTrue("Entity with id_4 should have been present in response.",
+          entities.contains(newEntity("app", "id_4")));
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetEntitiesBasedOnModifiedTime() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entities/cluster1/app1/app?modifiedTimeStart=1425016502090"
+          + "&modifiedTimeEnd=1425016503020");
+      ClientResponse resp = getResponse(client, uri);
+      Set<TimelineEntity> entities =
+          resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(2, entities.size());
+      assertTrue("Entities with id_1 and id_4 should have been" +
+          " present in response.",
+          entities.contains(newEntity("app", "id_1")) &&
+          entities.contains(newEntity("app", "id_4")));
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
+          "entities/cluster1/app1/app?modifiedTimeEnd=1425016502090");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(2, entities.size());
+      assertTrue("Entities with id_2 and id_3 should have been " +
+          "present in response.",
+          entities.contains(newEntity("app", "id_2")) &&
+          entities.contains(newEntity("app", "id_3")));
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
+          "entities/cluster1/app1/app?modifiedTimeStart=1425016503005");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+      assertTrue("Entity with id_4 should have been present in response.",
+          entities.contains(newEntity("app", "id_4")));
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetEntitiesByRelations() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entities/cluster1/app1/app?relatesto=flow:flow1");
+      ClientResponse resp = getResponse(client, uri);
+      Set<TimelineEntity> entities =
+          resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+      assertTrue("Entity with id_1 should have been present in response.",
+          entities.contains(newEntity("app", "id_1")));
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
+          "entities/cluster1/app1/app?isrelatedto=type1:tid1_2,type2:" +
+          "tid2_1%60");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+      assertTrue("Entity with id_1 should have been present in response.",
+          entities.contains(newEntity("app", "id_1")));
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
+          "entities/cluster1/app1/app?isrelatedto=type1:tid1_1:tid1_2" +
+          ",type2:tid2_1%60");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+      assertTrue("Entity with id_1 should have been present in response.",
+          entities.contains(newEntity("app", "id_1")));
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetEntitiesByConfigFilters() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entities/cluster1/app1/app?conffilters=config_1:123," +
+          "config_3:abc");
+      ClientResponse resp = getResponse(client, uri);
+      Set<TimelineEntity> entities =
+          resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(2, entities.size());
+      assertTrue("Entities with id_1 and id_3 should have been present" +
+          " in response.",
+          entities.contains(newEntity("app", "id_1")) &&
+          entities.contains(newEntity("app", "id_3")));
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetEntitiesByInfoFilters() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entities/cluster1/app1/app?infofilters=info2:3.5");
+      ClientResponse resp = getResponse(client, uri);
+      Set<TimelineEntity> entities =
+          resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+      assertTrue("Entity with id_3 should have been present in response.",
+          entities.contains(newEntity("app", "id_3")));
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetEntitiesByMetricFilters() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entities/cluster1/app1/app?metricfilters=metric3");
+      ClientResponse resp = getResponse(client, uri);
+      Set<TimelineEntity> entities =
+          resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(2, entities.size());
+      assertTrue("Entities with id_1 and id_2 should have been present" +
+          " in response.",
+          entities.contains(newEntity("app", "id_1")) &&
+          entities.contains(newEntity("app", "id_2")));
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetEntitiesByEventFilters() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entities/cluster1/app1/app?eventfilters=event_2,event_4");
+      ClientResponse resp = getResponse(client, uri);
+      Set<TimelineEntity> entities =
+          resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+      assertTrue("Entity with id_3 should have been present in response.",
+          entities.contains(newEntity("app", "id_3")));
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetEntitiesNoMatch() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entities/cluster1/app1/app?metricfilters=metric7&" +
+          "isrelatedto=type1:tid1_1;tid1_2,type2:tid2_1%60&relatesto=" +
+          "flow:flow1&eventfilters=event_2,event_4&infofilters=info2:3.5" +
+          "&createdTimeStart=1425016502030&createdTimeEnd=1425016502060");
+      ClientResponse resp = getResponse(client, uri);
+      Set<TimelineEntity> entities =
+          resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(0, entities.size());
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testInvalidValuesHandling() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entities/cluster1/app1/app?flowRunId=a23b");
+      verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
+          "entity/cluster1/app1/app/id_1?flowRunId=2ab15");
+      verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
+          "entities/cluster1/app1/app/?limit=#$561av");
+      verifyHttpResponse(client, uri, Status.BAD_REQUEST);
     } finally {
       client.destroy();
     }