| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.timelineservice.reader; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| import java.lang.reflect.UndeclaredThrowableException; |
| import java.net.HttpURLConnection; |
| import java.net.URI; |
| import java.net.URL; |
| import java.text.DateFormat; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import javax.ws.rs.core.MediaType; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; |
| import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; |
| import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Sets; |
| import com.sun.jersey.api.client.Client; |
| import com.sun.jersey.api.client.ClientResponse; |
| import com.sun.jersey.api.client.ClientResponse.Status; |
| import com.sun.jersey.api.client.GenericType; |
| import com.sun.jersey.api.client.config.ClientConfig; |
| import com.sun.jersey.api.client.config.DefaultClientConfig; |
| import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; |
| import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; |
| |
| public class TestTimelineReaderWebServicesHBaseStorage { |
| private int serverPort; |
| private TimelineReaderServer server; |
| private static HBaseTestingUtility util; |
| private static long ts = System.currentTimeMillis(); |
| private static long dayTs = |
| TimelineStorageUtils.getTopOfTheDayTimestamp(ts); |
| |
| @BeforeClass |
| public static void setup() throws Exception { |
| util = new HBaseTestingUtility(); |
| Configuration conf = util.getConfiguration(); |
| conf.setInt("hfile.format.version", 3); |
| util.startMiniCluster(); |
| TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); |
| loadData(); |
| } |
| |
| private static void loadData() throws Exception { |
| String cluster = "cluster1"; |
| String user = "user1"; |
| String flow = "flow_name"; |
| String flowVersion = "CF7022C10F1354"; |
| Long runid = 1002345678919L; |
| Long runid1 = 1002345678920L; |
| |
| TimelineEntities te = new TimelineEntities(); |
| TimelineEntity entity = new TimelineEntity(); |
| String id = "application_1111111111_1111"; |
| String type = TimelineEntityType.YARN_APPLICATION.toString(); |
| entity.setId(id); |
| entity.setType(type); |
| Long cTime = 1425016501000L; |
| entity.setCreatedTime(cTime); |
| entity.addConfig("cfg2", "value1"); |
| |
| // add metrics |
| Set<TimelineMetric> metrics = new HashSet<>(); |
| TimelineMetric m1 = new TimelineMetric(); |
| m1.setId("MAP_SLOT_MILLIS"); |
| Map<Long, Number> metricValues = |
| ImmutableMap.of(ts - 100000, (Number)2, ts - 90000, 7, ts - 80000, 40); |
| m1.setType(Type.TIME_SERIES); |
| m1.setValues(metricValues); |
| metrics.add(m1); |
| m1 = new TimelineMetric(); |
| m1.setId("MAP1_SLOT_MILLIS"); |
| metricValues = |
| ImmutableMap.of(ts - 100000, (Number)2, ts - 90000, 9, ts - 80000, 40); |
| m1.setType(Type.TIME_SERIES); |
| m1.setValues(metricValues); |
| metrics.add(m1); |
| m1 = new TimelineMetric(); |
| m1.setId("HDFS_BYTES_READ"); |
| metricValues = ImmutableMap.of(ts - 100000, (Number)31, ts - 80000, 57); |
| m1.setType(Type.TIME_SERIES); |
| m1.setValues(metricValues); |
| metrics.add(m1); |
| entity.addMetrics(metrics); |
| |
| TimelineEvent event = new TimelineEvent(); |
| event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); |
| event.setTimestamp(cTime); |
| String expKey = "foo_event"; |
| Object expVal = "test"; |
| event.addInfo(expKey, expVal); |
| entity.addEvent(event); |
| TimelineEvent event11 = new TimelineEvent(); |
| event11.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); |
| Long expTs = 1425019501000L; |
| event11.setTimestamp(expTs); |
| entity.addEvent(event11); |
| |
| te.addEntity(entity); |
| |
| // write another application with same metric to this flow |
| TimelineEntities te1 = new TimelineEntities(); |
| TimelineEntity entity1 = new TimelineEntity(); |
| id = "application_1111111111_2222"; |
| type = TimelineEntityType.YARN_APPLICATION.toString(); |
| entity1.setId(id); |
| entity1.setType(type); |
| cTime = 1425016501000L; |
| entity1.setCreatedTime(cTime); |
| entity1.addConfig("cfg1", "value1"); |
| // add metrics |
| metrics.clear(); |
| TimelineMetric m2 = new TimelineMetric(); |
| m2.setId("MAP_SLOT_MILLIS"); |
| metricValues = new HashMap<Long, Number>(); |
| metricValues.put(ts - 100000, 5L); |
| metricValues.put(ts - 80000, 101L); |
| m2.setType(Type.TIME_SERIES); |
| m2.setValues(metricValues); |
| metrics.add(m2); |
| entity1.addMetrics(metrics); |
| TimelineEvent event1 = new TimelineEvent(); |
| event1.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); |
| event1.setTimestamp(cTime); |
| event1.addInfo(expKey, expVal); |
| entity1.addEvent(event1); |
| te1.addEntity(entity1); |
| |
| String flow2 = "flow_name2"; |
| String flowVersion2 = "CF7022C10F1454"; |
| Long runid2 = 2102356789046L; |
| TimelineEntities te3 = new TimelineEntities(); |
| TimelineEntity entity3 = new TimelineEntity(); |
| id = "application_11111111111111_2223"; |
| entity3.setId(id); |
| entity3.setType(type); |
| cTime = 1425016501037L; |
| entity3.setCreatedTime(cTime); |
| TimelineEvent event2 = new TimelineEvent(); |
| event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); |
| event2.setTimestamp(cTime); |
| event2.addInfo("foo_event", "test"); |
| entity3.addEvent(event2); |
| te3.addEntity(entity3); |
| |
| TimelineEntities te4 = new TimelineEntities(); |
| TimelineEntity entity4 = new TimelineEntity(); |
| id = "application_1111111111_2224"; |
| entity4.setId(id); |
| entity4.setType(type); |
| cTime = 1425016501034L; |
| entity4.setCreatedTime(cTime); |
| TimelineEvent event4 = new TimelineEvent(); |
| event4.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); |
| event4.setTimestamp(cTime); |
| event4.addInfo("foo_event", "test"); |
| entity4.addEvent(event4); |
| metrics.clear(); |
| m2 = new TimelineMetric(); |
| m2.setId("MAP_SLOT_MILLIS"); |
| metricValues = ImmutableMap.of(ts - 100000, (Number)5L, ts - 80000, 101L); |
| m2.setType(Type.TIME_SERIES); |
| m2.setValues(metricValues); |
| metrics.add(m2); |
| entity4.addMetrics(metrics); |
| te4.addEntity(entity4); |
| |
| TimelineEntities te5 = new TimelineEntities(); |
| TimelineEntity entity5 = new TimelineEntity(); |
| entity5.setId("entity1"); |
| entity5.setType("type1"); |
| entity5.setCreatedTime(1425016501034L); |
| // add some config entries |
| entity5.addConfigs(ImmutableMap.of("config_param1", "value1", |
| "config_param2", "value2", "cfg_param1", "value3")); |
| entity5.addInfo(ImmutableMap.of("info1", (Object)"cluster1", |
| "info2", 2.0, "info3", 35000, "info4", 36000)); |
| metrics = new HashSet<>(); |
| m1 = new TimelineMetric(); |
| m1.setId("MAP_SLOT_MILLIS"); |
| metricValues = ImmutableMap.of(ts - 100000, (Number)2, ts - 80000, 40); |
| m1.setType(Type.TIME_SERIES); |
| m1.setValues(metricValues); |
| metrics.add(m1); |
| m1 = new TimelineMetric(); |
| m1.setId("HDFS_BYTES_READ"); |
| metricValues = ImmutableMap.of(ts - 100000, (Number)31, ts - 80000, 57); |
| m1.setType(Type.TIME_SERIES); |
| m1.setValues(metricValues); |
| metrics.add(m1); |
| entity5.addMetrics(metrics); |
| TimelineEvent event51 = new TimelineEvent(); |
| event51.setId("event1"); |
| event51.setTimestamp(cTime); |
| entity5.addEvent(event51); |
| TimelineEvent event52 = new TimelineEvent(); |
| event52.setId("event2"); |
| event52.setTimestamp(cTime); |
| entity5.addEvent(event52); |
| TimelineEvent event53 = new TimelineEvent(); |
| event53.setId("event3"); |
| event53.setTimestamp(cTime); |
| entity5.addEvent(event53); |
| TimelineEvent event54 = new TimelineEvent(); |
| event54.setId("event4"); |
| event54.setTimestamp(cTime); |
| entity5.addEvent(event54); |
| Map<String, Set<String>> isRelatedTo1 = new HashMap<String, Set<String>>(); |
| isRelatedTo1.put("type2", |
| Sets.newHashSet("entity21","entity22","entity23","entity24")); |
| isRelatedTo1.put("type4", Sets.newHashSet("entity41","entity42")); |
| isRelatedTo1.put("type1", Sets.newHashSet("entity14","entity15")); |
| isRelatedTo1.put("type3", |
| Sets.newHashSet("entity31", "entity35", "entity32", "entity33")); |
| entity5.addIsRelatedToEntities(isRelatedTo1); |
| Map<String, Set<String>> relatesTo1 = new HashMap<String, Set<String>>(); |
| relatesTo1.put("type2", |
| Sets.newHashSet("entity21","entity22","entity23","entity24")); |
| relatesTo1.put("type4", Sets.newHashSet("entity41","entity42")); |
| relatesTo1.put("type1", Sets.newHashSet("entity14","entity15")); |
| relatesTo1.put("type3", |
| Sets.newHashSet("entity31", "entity35", "entity32", "entity33")); |
| entity5.addRelatesToEntities(relatesTo1); |
| te5.addEntity(entity5); |
| |
| TimelineEntity entity6 = new TimelineEntity(); |
| entity6.setId("entity2"); |
| entity6.setType("type1"); |
| entity6.setCreatedTime(1425016501034L); |
| entity6.addConfigs(ImmutableMap.of("cfg_param3", "value1", |
| "configuration_param2", "value2", "config_param1", "value3")); |
| entity6.addInfo(ImmutableMap.of("info1", (Object)"cluster2", |
| "info2", 2.0, "info4", 35000)); |
| metrics = new HashSet<>(); |
| m1 = new TimelineMetric(); |
| m1.setId("MAP1_SLOT_MILLIS"); |
| metricValues = ImmutableMap.of(ts - 100000, (Number)12, ts - 80000, 140); |
| m1.setType(Type.TIME_SERIES); |
| m1.setValues(metricValues); |
| metrics.add(m1); |
| m1 = new TimelineMetric(); |
| m1.setId("HDFS_BYTES_READ"); |
| metricValues = ImmutableMap.of(ts - 100000, (Number)78, ts - 80000, 157); |
| m1.setType(Type.TIME_SERIES); |
| m1.setValues(metricValues); |
| metrics.add(m1); |
| m1 = new TimelineMetric(); |
| m1.setId("MAP11_SLOT_MILLIS"); |
| m1.setType(Type.SINGLE_VALUE); |
| m1.addValue(ts - 100000, 122); |
| metrics.add(m1); |
| entity6.addMetrics(metrics); |
| TimelineEvent event61 = new TimelineEvent(); |
| event61.setId("event1"); |
| event61.setTimestamp(cTime); |
| entity6.addEvent(event61); |
| TimelineEvent event62 = new TimelineEvent(); |
| event62.setId("event5"); |
| event62.setTimestamp(cTime); |
| entity6.addEvent(event62); |
| TimelineEvent event63 = new TimelineEvent(); |
| event63.setId("event3"); |
| event63.setTimestamp(cTime); |
| entity6.addEvent(event63); |
| TimelineEvent event64 = new TimelineEvent(); |
| event64.setId("event6"); |
| event64.setTimestamp(cTime); |
| entity6.addEvent(event64); |
| Map<String, Set<String>> isRelatedTo2 = new HashMap<String, Set<String>>(); |
| isRelatedTo2.put("type2", |
| Sets.newHashSet("entity21","entity22","entity23","entity24")); |
| isRelatedTo2.put("type5", Sets.newHashSet("entity51","entity52")); |
| isRelatedTo2.put("type6", Sets.newHashSet("entity61","entity66")); |
| isRelatedTo2.put("type3", Sets.newHashSet("entity31")); |
| entity6.addIsRelatedToEntities(isRelatedTo2); |
| Map<String, Set<String>> relatesTo2 = new HashMap<String, Set<String>>(); |
| relatesTo2.put("type2", |
| Sets.newHashSet("entity21","entity22","entity23","entity24")); |
| relatesTo2.put("type5", Sets.newHashSet("entity51","entity52")); |
| relatesTo2.put("type6", Sets.newHashSet("entity61","entity66")); |
| relatesTo2.put("type3", Sets.newHashSet("entity31")); |
| entity6.addRelatesToEntities(relatesTo2); |
| te5.addEntity(entity6); |
| |
| HBaseTimelineWriterImpl hbi = null; |
| Configuration c1 = util.getConfiguration(); |
| try { |
| hbi = new HBaseTimelineWriterImpl(c1); |
| hbi.init(c1); |
| hbi.write(cluster, user, flow, flowVersion, runid, entity.getId(), te); |
| hbi.write(cluster, user, flow, flowVersion, runid, entity1.getId(), te1); |
| hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4); |
| hbi.write(cluster, user, flow2, |
| flowVersion2, runid2, entity3.getId(), te3); |
| hbi.write(cluster, user, flow, flowVersion, runid, |
| "application_1111111111_1111", te5); |
| hbi.flush(); |
| } finally { |
| if (hbi != null) { |
| hbi.close(); |
| } |
| } |
| } |
| |
| @AfterClass |
| public static void tearDown() throws Exception { |
| util.shutdownMiniCluster(); |
| } |
| |
| @Before |
| public void init() throws Exception { |
| try { |
| Configuration config = util.getConfiguration(); |
| config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); |
| config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); |
| config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, |
| "localhost:0"); |
| config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); |
| config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, |
| "org.apache.hadoop.yarn.server.timelineservice.storage." + |
| "HBaseTimelineReaderImpl"); |
| config.setInt("hfile.format.version", 3); |
| server = new TimelineReaderServer(); |
| server.init(config); |
| server.start(); |
| serverPort = server.getWebServerPort(); |
| } catch (Exception e) { |
| Assert.fail("Web server failed to start"); |
| } |
| } |
| |
| private static Client createClient() { |
| ClientConfig cfg = new DefaultClientConfig(); |
| cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class); |
| return new Client(new URLConnectionClientHandler( |
| new DummyURLConnectionFactory()), cfg); |
| } |
| |
| private static ClientResponse getResponse(Client client, URI uri) |
| throws Exception { |
| ClientResponse resp = |
| client.resource(uri).accept(MediaType.APPLICATION_JSON) |
| .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); |
| if (resp == null || |
| resp.getClientResponseStatus() != ClientResponse.Status.OK) { |
| String msg = ""; |
| if (resp != null) { |
| msg = resp.getClientResponseStatus().toString(); |
| } |
| throw new IOException("Incorrect response from timeline reader. " + |
| "Status=" + msg); |
| } |
| return resp; |
| } |
| |
| private static class DummyURLConnectionFactory |
| implements HttpURLConnectionFactory { |
| |
| @Override |
| public HttpURLConnection getHttpURLConnection(final URL url) throws IOException { |
| try { |
| return (HttpURLConnection)url.openConnection(); |
| } catch (UndeclaredThrowableException e) { |
| throw new IOException(e.getCause()); |
| } |
| } |
| } |
| |
| private static TimelineEntity newEntity(String type, String id) { |
| TimelineEntity entity = new TimelineEntity(); |
| entity.setIdentifier(new TimelineEntity.Identifier(type, id)); |
| return entity; |
| } |
| |
| private static TimelineMetric newMetric(TimelineMetric.Type type, |
| String id, long ts, Number value) { |
| TimelineMetric metric = new TimelineMetric(type); |
| metric.setId(id); |
| metric.addValue(ts, value); |
| return metric; |
| } |
| |
| private static boolean verifyMetricValues(Map<Long, Number> m1, |
| Map<Long, Number> m2) { |
| for (Map.Entry<Long, Number> entry : m1.entrySet()) { |
| if (!m2.containsKey(entry.getKey())) { |
| return false; |
| } |
| if (m2.get(entry.getKey()).equals(entry.getValue())) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| private static boolean verifyMetrics( |
| TimelineMetric m, TimelineMetric... metrics) { |
| for (TimelineMetric metric : metrics) { |
| if (!metric.equals(m)) { |
| continue; |
| } |
| if (!verifyMetricValues(metric.getValues(), m.getValues())) { |
| continue; |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| private static void verifyHttpResponse(Client client, URI uri, |
| Status status) { |
| ClientResponse resp = |
| client.resource(uri).accept(MediaType.APPLICATION_JSON) |
| .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); |
| assertNotNull(resp); |
| assertTrue("Response from server should have been " + status, |
| resp.getClientResponseStatus().equals(status)); |
| System.out.println("Response is: " + resp.getEntity(String.class)); |
| } |
| |
| @Test |
| public void testGetFlowRun() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" + |
| "1002345678919"); |
| ClientResponse resp = getResponse(client, uri); |
| FlowRunEntity entity = resp.getEntity(FlowRunEntity.class); |
| assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); |
| assertNotNull(entity); |
| assertEquals("user1@flow_name/1002345678919", entity.getId()); |
| assertEquals(3, entity.getMetrics().size()); |
| TimelineMetric m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "HDFS_BYTES_READ", ts - 80000, 57L); |
| TimelineMetric m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "MAP_SLOT_MILLIS", ts - 80000, 141L); |
| TimelineMetric m3 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "MAP1_SLOT_MILLIS", ts - 80000, 40L); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertTrue(verifyMetrics(metric, m1, m2, m3)); |
| } |
| |
| // Query without specifying cluster ID. |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/users/user1/flows/flow_name/runs/1002345678919"); |
| resp = getResponse(client, uri); |
| entity = resp.getEntity(FlowRunEntity.class); |
| assertNotNull(entity); |
| assertEquals("user1@flow_name/1002345678919", entity.getId()); |
| assertEquals(3, entity.getMetrics().size()); |
| m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "HDFS_BYTES_READ", ts - 80000, 57L); |
| m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "MAP_SLOT_MILLIS", ts - 80000, 141L); |
| m3 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "MAP1_SLOT_MILLIS", ts - 80000, 40L); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertTrue(verifyMetrics(metric, m1, m2, m3)); |
| } |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetFlowRuns() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/users/user1/flows/flow_name/runs"); |
| ClientResponse resp = getResponse(client, uri); |
| Set<FlowRunEntity> entities = |
| resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); |
| assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (FlowRunEntity entity : entities) { |
| assertTrue("Id, run id or start time does not match.", |
| ((entity.getId().equals("user1@flow_name/1002345678919")) && |
| (entity.getRunId() == 1002345678919L) && |
| (entity.getStartTime() == 1425016501000L)) || |
| ((entity.getId().equals("user1@flow_name/1002345678920")) && |
| (entity.getRunId() == 1002345678920L) && |
| (entity.getStartTime() == 1425016501034L))); |
| assertEquals(0, entity.getMetrics().size()); |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + |
| "clusters/cluster1/users/user1/flows/flow_name/runs?limit=1"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); |
| assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| for (FlowRunEntity entity : entities) { |
| assertTrue("Id, run id or start time does not match.", |
| entity.getId().equals("user1@flow_name/1002345678920") && |
| entity.getRunId() == 1002345678920L && |
| entity.getStartTime() == 1425016501034L); |
| assertEquals(0, entity.getMetrics().size()); |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + |
| "createdtimestart=1425016501030"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); |
| assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| for (FlowRunEntity entity : entities) { |
| assertTrue("Id, run id or start time does not match.", |
| entity.getId().equals("user1@flow_name/1002345678920") && |
| entity.getRunId() == 1002345678920L && |
| entity.getStartTime() == 1425016501034L); |
| assertEquals(0, entity.getMetrics().size()); |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + |
| "createdtimestart=1425016500999&createdtimeend=1425016501035"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); |
| assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (FlowRunEntity entity : entities) { |
| assertTrue("Id, run id or start time does not match.", |
| ((entity.getId().equals("user1@flow_name/1002345678919")) && |
| (entity.getRunId() == 1002345678919L) && |
| (entity.getStartTime() == 1425016501000L)) || |
| ((entity.getId().equals("user1@flow_name/1002345678920")) && |
| (entity.getRunId() == 1002345678920L) && |
| (entity.getStartTime() == 1425016501034L))); |
| assertEquals(0, entity.getMetrics().size()); |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + |
| "createdtimeend=1425016501030"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); |
| assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| for (FlowRunEntity entity : entities) { |
| assertTrue("Id, run id or start time does not match.", |
| entity.getId().equals("user1@flow_name/1002345678919") && |
| entity.getRunId() == 1002345678919L && |
| entity.getStartTime() == 1425016501000L); |
| assertEquals(0, entity.getMetrics().size()); |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + |
| "fields=metrics"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); |
| assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (FlowRunEntity entity : entities) { |
| assertTrue("Id, run id or start time does not match.", |
| ((entity.getId().equals("user1@flow_name/1002345678919")) && |
| (entity.getRunId() == 1002345678919L) && |
| (entity.getStartTime() == 1425016501000L) && |
| (entity.getMetrics().size() == 3)) || |
| ((entity.getId().equals("user1@flow_name/1002345678920")) && |
| (entity.getRunId() == 1002345678920L) && |
| (entity.getStartTime() == 1425016501034L) && |
| (entity.getMetrics().size() == 1))); |
| } |
| |
| // fields as CONFIGS will lead to a HTTP 400 as it makes no sense for |
| // flow runs. |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + |
| "fields=CONFIGS"); |
| verifyHttpResponse(client, uri, Status.BAD_REQUEST); |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetFlowRunsMetricsToRetrieve() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + |
| "metricstoretrieve=MAP_,HDFS_"); |
| ClientResponse resp = getResponse(client, uri); |
| Set<FlowRunEntity> entities = |
| resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); |
| assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| int metricCnt = 0; |
| for (FlowRunEntity entity : entities) { |
| metricCnt += entity.getMetrics().size(); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertTrue(metric.getId().startsWith("MAP_") || |
| metric.getId().startsWith("HDFS_")); |
| } |
| } |
| assertEquals(3, metricCnt); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + |
| "metricstoretrieve=!(MAP_,HDFS_)"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); |
| assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| metricCnt = 0; |
| for (FlowRunEntity entity : entities) { |
| metricCnt += entity.getMetrics().size(); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertTrue(metric.getId().startsWith("MAP1_")); |
| } |
| } |
| assertEquals(1, metricCnt); |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetEntitiesByUID() throws Exception { |
| Client client = createClient(); |
| try { |
| // Query all flows. |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/flows"); |
| ClientResponse resp = getResponse(client, uri); |
| Set<FlowActivityEntity> flowEntities = |
| resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); |
| assertNotNull(flowEntities); |
| assertEquals(2, flowEntities.size()); |
| List<String> listFlowUIDs = new ArrayList<String>(); |
| for (FlowActivityEntity entity : flowEntities) { |
| String flowUID = |
| (String)entity.getInfo().get(TimelineReaderManager.UID_KEY); |
| listFlowUIDs.add(flowUID); |
| assertEquals(TimelineUIDConverter.FLOW_UID.encodeUID( |
| new TimelineReaderContext(entity.getCluster(), entity.getUser(), |
| entity.getFlowName(), null, null, null, null)), flowUID); |
| assertTrue((entity.getId().endsWith("@flow_name") && |
| entity.getFlowRuns().size() == 2) || |
| (entity.getId().endsWith("@flow_name2") && |
| entity.getFlowRuns().size() == 1)); |
| } |
| |
| // Query flowruns based on UID returned in query above. |
| List<String> listFlowRunUIDs = new ArrayList<String>(); |
| for (String flowUID : listFlowUIDs) { |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/flow-uid/" + flowUID + "/runs"); |
| resp = getResponse(client, uri); |
| Set<FlowRunEntity> frEntities = |
| resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); |
| assertNotNull(frEntities); |
| for (FlowRunEntity entity : frEntities) { |
| String flowRunUID = |
| (String)entity.getInfo().get(TimelineReaderManager.UID_KEY); |
| listFlowRunUIDs.add(flowRunUID); |
| assertEquals(TimelineUIDConverter.FLOWRUN_UID.encodeUID( |
| new TimelineReaderContext("cluster1", entity.getUser(), |
| entity.getName(), entity.getRunId(), null, null, null)), |
| flowRunUID); |
| } |
| } |
| assertEquals(3, listFlowRunUIDs.size()); |
| |
| // Query single flowrun based on UIDs' returned in query to get flowruns. |
| for (String flowRunUID : listFlowRunUIDs) { |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/run-uid/" + flowRunUID); |
| resp = getResponse(client, uri); |
| FlowRunEntity entity = resp.getEntity(FlowRunEntity.class); |
| assertNotNull(entity); |
| } |
| |
| // Query apps based on UIDs' returned in query to get flowruns. |
| List<String> listAppUIDs = new ArrayList<String>(); |
| for (String flowRunUID : listFlowRunUIDs) { |
| TimelineReaderContext context = |
| TimelineUIDConverter.FLOWRUN_UID.decodeUID(flowRunUID); |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/run-uid/" + flowRunUID + "/apps"); |
| resp = getResponse(client, uri); |
| Set<TimelineEntity> appEntities = |
| resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(appEntities); |
| for (TimelineEntity entity : appEntities) { |
| String appUID = |
| (String)entity.getInfo().get(TimelineReaderManager.UID_KEY); |
| listAppUIDs.add(appUID); |
| assertEquals(TimelineUIDConverter.APPLICATION_UID.encodeUID( |
| new TimelineReaderContext(context.getClusterId(), |
| context.getUserId(), context.getFlowName(), |
| context.getFlowRunId(), entity.getId(), null, null)), appUID); |
| } |
| } |
| assertEquals(4, listAppUIDs.size()); |
| |
| // Query single app based on UIDs' returned in query to get apps. |
| for (String appUID : listAppUIDs) { |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/app-uid/" + appUID); |
| resp = getResponse(client, uri); |
| TimelineEntity entity = resp.getEntity(TimelineEntity.class); |
| assertNotNull(entity); |
| } |
| |
| // Query entities based on UIDs' returned in query to get apps and |
| // a specific entity type(in this case type1). |
| List<String> listEntityUIDs = new ArrayList<String>(); |
| for (String appUID : listAppUIDs) { |
| TimelineReaderContext context = |
| TimelineUIDConverter.APPLICATION_UID.decodeUID(appUID); |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/app-uid/" + appUID + "/entities/type1"); |
| resp = getResponse(client, uri); |
| Set<TimelineEntity> entities = |
| resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| for (TimelineEntity entity : entities) { |
| String entityUID = |
| (String)entity.getInfo().get(TimelineReaderManager.UID_KEY); |
| listEntityUIDs.add(entityUID); |
| assertEquals(TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID( |
| new TimelineReaderContext(context.getClusterId(), |
| context.getUserId(), context.getFlowName(), |
| context.getFlowRunId(), context.getAppId(), "type1", |
| entity.getId())), entityUID); |
| } |
| } |
| assertEquals(2, listEntityUIDs.size()); |
| |
| // Query single entity based on UIDs' returned in query to get entities. |
| for (String entityUID : listEntityUIDs) { |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/entity-uid/" + entityUID); |
| resp = getResponse(client, uri); |
| TimelineEntity entity = resp.getEntity(TimelineEntity.class); |
| assertNotNull(entity); |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/flow-uid/dummy:flow/runs"); |
| verifyHttpResponse(client, uri, Status.BAD_REQUEST); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/run-uid/dummy:flowrun"); |
| verifyHttpResponse(client, uri, Status.BAD_REQUEST); |
| |
| // Run Id is not a numerical value. |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/run-uid/some:dummy:flow:123v456"); |
| verifyHttpResponse(client, uri, Status.BAD_REQUEST); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/run-uid/dummy:flowrun/apps"); |
| verifyHttpResponse(client, uri, Status.BAD_REQUEST); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/app-uid/dummy:app"); |
| verifyHttpResponse(client, uri, Status.BAD_REQUEST); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/app-uid/dummy:app/entities/type1"); |
| verifyHttpResponse(client, uri, Status.BAD_REQUEST); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/entity-uid/dummy:entity"); |
| verifyHttpResponse(client, uri, Status.BAD_REQUEST); |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception { |
| Client client = createClient(); |
| try { |
| String appUIDWithFlowInfo = |
| "cluster1!user1!flow_name!1002345678919!application_1111111111_1111"; |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/"+ |
| "timeline/app-uid/" + appUIDWithFlowInfo); |
| ClientResponse resp = getResponse(client, uri); |
| TimelineEntity appEntity1 = resp.getEntity(TimelineEntity.class); |
| assertNotNull(appEntity1); |
| assertEquals( |
| TimelineEntityType.YARN_APPLICATION.toString(), appEntity1.getType()); |
| assertEquals("application_1111111111_1111", appEntity1.getId()); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + |
| "app-uid/" + appUIDWithFlowInfo + "/entities/type1"); |
| resp = getResponse(client, uri); |
| Set<TimelineEntity> entities1 = |
| resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities1); |
| assertEquals(2, entities1.size()); |
| for (TimelineEntity entity : entities1) { |
| assertNotNull(entity.getInfo()); |
| assertEquals(1, entity.getInfo().size()); |
| String uid = |
| (String) entity.getInfo().get(TimelineReaderManager.UID_KEY); |
| assertNotNull(uid); |
| assertTrue(uid.equals(appUIDWithFlowInfo + "!type1!entity1") || |
| uid.equals(appUIDWithFlowInfo + "!type1!entity2")); |
| } |
| |
| String appUIDWithoutFlowInfo = "cluster1!application_1111111111_1111"; |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+ |
| "app-uid/" + appUIDWithoutFlowInfo); |
| resp = getResponse(client, uri);; |
| TimelineEntity appEntity2 = resp.getEntity(TimelineEntity.class); |
| assertNotNull(appEntity2); |
| assertEquals( |
| TimelineEntityType.YARN_APPLICATION.toString(), appEntity2.getType()); |
| assertEquals("application_1111111111_1111", appEntity2.getId()); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + |
| "app-uid/" + appUIDWithoutFlowInfo + "/entities/type1"); |
| resp = getResponse(client, uri); |
| Set<TimelineEntity> entities2 = |
| resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities2); |
| assertEquals(2, entities2.size()); |
| for (TimelineEntity entity : entities2) { |
| assertNotNull(entity.getInfo()); |
| assertEquals(1, entity.getInfo().size()); |
| String uid = |
| (String) entity.getInfo().get(TimelineReaderManager.UID_KEY); |
| assertNotNull(uid); |
| assertTrue(uid.equals(appUIDWithoutFlowInfo + "!type1!entity1") || |
| uid.equals(appUIDWithoutFlowInfo + "!type1!entity2")); |
| } |
| |
| String entityUIDWithFlowInfo = appUIDWithFlowInfo + "!type1!entity1"; |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+ |
| "entity-uid/" + entityUIDWithFlowInfo); |
| resp = getResponse(client, uri);; |
| TimelineEntity singleEntity1 = resp.getEntity(TimelineEntity.class); |
| assertNotNull(singleEntity1); |
| assertEquals("type1", singleEntity1.getType()); |
| assertEquals("entity1", singleEntity1.getId()); |
| |
| String entityUIDWithoutFlowInfo = |
| appUIDWithoutFlowInfo + "!type1!entity1"; |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+ |
| "entity-uid/" + entityUIDWithoutFlowInfo); |
| resp = getResponse(client, uri);; |
| TimelineEntity singleEntity2 = resp.getEntity(TimelineEntity.class); |
| assertNotNull(singleEntity2); |
| assertEquals("type1", singleEntity2.getType()); |
| assertEquals("entity1", singleEntity2.getId()); |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testUIDNotProperlyEscaped() throws Exception { |
| Client client = createClient(); |
| try { |
| String appUID = |
| "cluster1!user*1!flow_name!1002345678919!application_1111111111_1111"; |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/"+ |
| "timeline/app-uid/" + appUID); |
| verifyHttpResponse(client, uri, Status.BAD_REQUEST); |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetFlows() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/flows"); |
| ClientResponse resp = getResponse(client, uri); |
| Set<FlowActivityEntity> entities = |
| resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (FlowActivityEntity entity : entities) { |
| assertTrue((entity.getId().endsWith("@flow_name") && |
| entity.getFlowRuns().size() == 2) || |
| (entity.getId().endsWith("@flow_name2") && |
| entity.getFlowRuns().size() == 1)); |
| } |
| |
| // Query without specifying cluster ID. |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/flows/"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/flows?limit=1"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| |
| long firstFlowActivity = |
| TimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L); |
| |
| DateFormat fmt = TimelineReaderWebServices.DATE_FORMAT.get(); |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/flows?daterange=" |
| + fmt.format(firstFlowActivity) + "-" |
| + fmt.format(dayTs)); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (FlowActivityEntity entity : entities) { |
| assertTrue((entity.getId().endsWith("@flow_name") && |
| entity.getFlowRuns().size() == 2) || |
| (entity.getId().endsWith("@flow_name2") && |
| entity.getFlowRuns().size() == 1)); |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/flows?daterange=" + |
| fmt.format(dayTs + (4*86400000L))); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(0, entities.size()); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/flows?daterange=-" + |
| fmt.format(dayTs)); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/flows?daterange=" + |
| fmt.format(firstFlowActivity) + "-"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/flows?daterange=20150711:20150714"); |
| verifyHttpResponse(client, uri, Status.BAD_REQUEST); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/flows?daterange=20150714-20150711"); |
| verifyHttpResponse(client, uri, Status.BAD_REQUEST); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/flows?daterange=2015071129-20150712"); |
| verifyHttpResponse(client, uri, Status.BAD_REQUEST); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/flows?daterange=20150711-2015071243"); |
| verifyHttpResponse(client, uri, Status.BAD_REQUEST); |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetApp() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111?" + |
| "userid=user1&fields=ALL&flowname=flow_name&flowrunid=1002345678919"); |
| ClientResponse resp = getResponse(client, uri); |
| TimelineEntity entity = resp.getEntity(TimelineEntity.class); |
| assertNotNull(entity); |
| assertEquals("application_1111111111_1111", entity.getId()); |
| assertEquals(3, entity.getMetrics().size()); |
| TimelineMetric m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "HDFS_BYTES_READ", ts - 80000, 57L); |
| TimelineMetric m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "MAP_SLOT_MILLIS", ts - 80000, 40L); |
| TimelineMetric m3 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "MAP1_SLOT_MILLIS", ts - 80000, 40L); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertTrue(verifyMetrics(metric, m1, m2, m3)); |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/apps/application_1111111111_2222?userid=user1" + |
| "&fields=metrics&flowname=flow_name&flowrunid=1002345678919"); |
| resp = getResponse(client, uri); |
| entity = resp.getEntity(TimelineEntity.class); |
| assertNotNull(entity); |
| assertEquals("application_1111111111_2222", entity.getId()); |
| assertEquals(1, entity.getMetrics().size()); |
| TimelineMetric m4 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "MAP_SLOT_MILLIS", ts - 80000, 101L); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertTrue(verifyMetrics(metric, m4)); |
| } |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetAppWithoutFlowInfo() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111?" + |
| "fields=ALL"); |
| ClientResponse resp = getResponse(client, uri); |
| TimelineEntity entity = resp.getEntity(TimelineEntity.class); |
| assertNotNull(entity); |
| assertEquals("application_1111111111_1111", entity.getId()); |
| assertEquals(1, entity.getConfigs().size()); |
| assertEquals(3, entity.getMetrics().size()); |
| TimelineMetric m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "HDFS_BYTES_READ", ts - 80000, 57L); |
| TimelineMetric m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "MAP_SLOT_MILLIS", ts - 80000, 40L); |
| TimelineMetric m3 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "MAP1_SLOT_MILLIS", ts - 80000, 40L); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertTrue(verifyMetrics(metric, m1, m2, m3)); |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111?" + |
| "fields=ALL&metricslimit=10"); |
| resp = getResponse(client, uri); |
| entity = resp.getEntity(TimelineEntity.class); |
| assertNotNull(entity); |
| assertEquals("application_1111111111_1111", entity.getId()); |
| assertEquals(1, entity.getConfigs().size()); |
| assertEquals(3, entity.getMetrics().size()); |
| m1 = newMetric(TimelineMetric.Type.TIME_SERIES, "HDFS_BYTES_READ", |
| ts - 100000, 31L); |
| m1.addValue(ts - 80000, 57L); |
| m2 = newMetric(TimelineMetric.Type.TIME_SERIES, "MAP_SLOT_MILLIS", |
| ts - 100000, 2L); |
| m2.addValue(ts - 80000, 40L); |
| m3 = newMetric(TimelineMetric.Type.TIME_SERIES, "MAP1_SLOT_MILLIS", |
| ts - 100000, 2L); |
| m3.addValue(ts - 80000, 40L); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertTrue(verifyMetrics(metric, m1, m2, m3)); |
| } |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetEntityWithoutFlowInfo() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1/entity1"); |
| ClientResponse resp = getResponse(client, uri); |
| TimelineEntity entity = resp.getEntity(TimelineEntity.class); |
| assertNotNull(entity); |
| assertEquals("entity1", entity.getId()); |
| assertEquals("type1", entity.getType()); |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetEntitiesWithoutFlowInfo() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1"); |
| ClientResponse resp = getResponse(client, uri); |
| Set<TimelineEntity> entities = |
| resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity1") || |
| entity.getId().equals("entity2")); |
| } |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| /** |
| * Tests if specific configs and metrics are retrieve for getEntities call. |
| */ |
| @Test |
| public void testGetEntitiesDataToRetrieve() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?confstoretrieve=cfg_"); |
| ClientResponse resp = getResponse(client, uri); |
| Set<TimelineEntity> entities = |
| resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| int cfgCnt = 0; |
| for (TimelineEntity entity : entities) { |
| cfgCnt += entity.getConfigs().size(); |
| for (String configKey : entity.getConfigs().keySet()) { |
| assertTrue(configKey.startsWith("cfg_")); |
| } |
| } |
| assertEquals(2, cfgCnt); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?confstoretrieve=cfg_,config_"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| cfgCnt = 0; |
| for (TimelineEntity entity : entities) { |
| cfgCnt += entity.getConfigs().size(); |
| for (String configKey : entity.getConfigs().keySet()) { |
| assertTrue(configKey.startsWith("cfg_") || |
| configKey.startsWith("config_")); |
| } |
| } |
| assertEquals(5, cfgCnt); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?confstoretrieve=!(cfg_,config_)"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| cfgCnt = 0; |
| for (TimelineEntity entity : entities) { |
| cfgCnt += entity.getConfigs().size(); |
| for (String configKey : entity.getConfigs().keySet()) { |
| assertTrue(configKey.startsWith("configuration_")); |
| } |
| } |
| assertEquals(1, cfgCnt); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?metricstoretrieve=MAP_"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| int metricCnt = 0; |
| for (TimelineEntity entity : entities) { |
| metricCnt += entity.getMetrics().size(); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertTrue(metric.getId().startsWith("MAP_")); |
| } |
| } |
| assertEquals(1, metricCnt); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?metricstoretrieve=MAP1_,HDFS_"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| metricCnt = 0; |
| for (TimelineEntity entity : entities) { |
| metricCnt += entity.getMetrics().size(); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertTrue(metric.getId().startsWith("MAP1_") || |
| metric.getId().startsWith("HDFS_")); |
| } |
| } |
| assertEquals(3, metricCnt); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?metricstoretrieve=!(MAP1_,HDFS_)"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| metricCnt = 0; |
| for (TimelineEntity entity : entities) { |
| metricCnt += entity.getMetrics().size(); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertTrue(metric.getId().startsWith("MAP_") || |
| metric.getId().startsWith("MAP11_")); |
| } |
| } |
| assertEquals(2, metricCnt); |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetEntitiesConfigFilters() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?conffilters=config_param1%20eq%20value1%20OR%20" + |
| "config_param1%20eq%20value3"); |
| ClientResponse resp = getResponse(client, uri); |
| Set<TimelineEntity> entities = |
| resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity1") || |
| entity.getId().equals("entity2")); |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?conffilters=config_param1%20eq%20value1%20AND" + |
| "%20configuration_param2%20eq%20value2"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(0, entities.size()); |
| |
| // conffilters=(config_param1 eq value1 AND configuration_param2 eq |
| // value2) OR (config_param1 eq value3 AND cfg_param3 eq value1) |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?conffilters=(config_param1%20eq%20value1%20AND" + |
| "%20configuration_param2%20eq%20value2)%20OR%20(config_param1%20eq" + |
| "%20value3%20AND%20cfg_param3%20eq%20value1)"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| int cfgCnt = 0; |
| for (TimelineEntity entity : entities) { |
| cfgCnt += entity.getConfigs().size(); |
| assertTrue(entity.getId().equals("entity2")); |
| } |
| assertEquals(0, cfgCnt); |
| |
| // conffilters=(config_param1 eq value1 AND configuration_param2 eq |
| // value2) OR (config_param1 eq value3 AND cfg_param3 eq value1) |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?conffilters=(config_param1%20eq%20value1%20AND" + |
| "%20configuration_param2%20eq%20value2)%20OR%20(config_param1%20eq" + |
| "%20value3%20AND%20cfg_param3%20eq%20value1)&fields=CONFIGS"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| cfgCnt = 0; |
| for (TimelineEntity entity : entities) { |
| cfgCnt += entity.getConfigs().size(); |
| assertTrue(entity.getId().equals("entity2")); |
| } |
| assertEquals(3, cfgCnt); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?conffilters=(config_param1%20eq%20value1%20AND" + |
| "%20configuration_param2%20eq%20value2)%20OR%20(config_param1%20eq" + |
| "%20value3%20AND%20cfg_param3%20eq%20value1)&confstoretrieve=cfg_," + |
| "configuration_"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| cfgCnt = 0; |
| for (TimelineEntity entity : entities) { |
| cfgCnt += entity.getConfigs().size(); |
| assertTrue(entity.getId().equals("entity2")); |
| for (String configKey : entity.getConfigs().keySet()) { |
| assertTrue(configKey.startsWith("cfg_") || |
| configKey.startsWith("configuration_")); |
| } |
| } |
| assertEquals(2, cfgCnt); |
| |
| // Test for behavior when compare op is ne(not equals) vs ene |
| // (exists and not equals). configuration_param2 does not exist for |
| // entity1. For ne, both entity1 and entity2 will be returned. For ene, |
| // only entity2 will be returned as we are checking for existence too. |
| // conffilters=configuration_param2 ne value3 |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?conffilters=configuration_param2%20ne%20value3"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity1") || |
| entity.getId().equals("entity2")); |
| } |
| // conffilters=configuration_param2 ene value3 |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?conffilters=configuration_param2%20ene%20value3"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity2")); |
| } |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetEntitiesInfoFilters() throws Exception { |
| Client client = createClient(); |
| try { |
| // infofilters=info1 eq cluster1 OR info1 eq cluster2 |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?infofilters=info1%20eq%20cluster1%20OR%20info1%20eq" + |
| "%20cluster2"); |
| ClientResponse resp = getResponse(client, uri); |
| Set<TimelineEntity> entities = |
| resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity1") || |
| entity.getId().equals("entity2")); |
| } |
| |
| // infofilters=info1 eq cluster1 AND info4 eq 35000 |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?infofilters=info1%20eq%20cluster1%20AND%20info4%20" + |
| "eq%2035000"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(0, entities.size()); |
| |
| // infofilters=info4 eq 35000 OR info4 eq 36000 |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?infofilters=info4%20eq%2035000%20OR%20info4%20eq" + |
| "%2036000"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity1") || |
| entity.getId().equals("entity2")); |
| } |
| |
| // infofilters=(info1 eq cluster1 AND info4 eq 35000) OR |
| // (info1 eq cluster2 AND info2 eq 2.0) |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?infofilters=(info1%20eq%20cluster1%20AND%20info4%20" + |
| "eq%2035000)%20OR%20(info1%20eq%20cluster2%20AND%20info2%20eq%202.0)"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| int infoCnt = 0; |
| for (TimelineEntity entity : entities) { |
| infoCnt += entity.getInfo().size(); |
| assertTrue(entity.getId().equals("entity2")); |
| } |
| // Includes UID in info field even if fields not specified as INFO. |
| assertEquals(1, infoCnt); |
| |
| // infofilters=(info1 eq cluster1 AND info4 eq 35000) OR |
| // (info1 eq cluster2 AND info2 eq 2.0) |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?infofilters=(info1%20eq%20cluster1%20AND%20info4%20" + |
| "eq%2035000)%20OR%20(info1%20eq%20cluster2%20AND%20info2%20eq%20" + |
| "2.0)&fields=INFO"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| infoCnt = 0; |
| for (TimelineEntity entity : entities) { |
| infoCnt += entity.getInfo().size(); |
| assertTrue(entity.getId().equals("entity2")); |
| } |
| // Includes UID in info field. |
| assertEquals(4, infoCnt); |
| |
| // Test for behavior when compare op is ne(not equals) vs ene |
| // (exists and not equals). info3 does not exist for entity2. For ne, |
| // both entity1 and entity2 will be returned. For ene, only entity2 will |
| // be returned as we are checking for existence too. |
| // infofilters=info3 ne 39000 |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?infofilters=info3%20ne%2039000"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity1") || |
| entity.getId().equals("entity2")); |
| } |
| // infofilters=info3 ene 39000 |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?infofilters=info3%20ene%2039000"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity1")); |
| } |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetEntitiesMetricFilters() throws Exception { |
| Client client = createClient(); |
| try { |
| // metricfilters=HDFS_BYTES_READ lt 60 OR HDFS_BYTES_READ eq 157 |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?metricfilters=HDFS_BYTES_READ%20lt%2060%20OR%20" + |
| "HDFS_BYTES_READ%20eq%20157"); |
| ClientResponse resp = getResponse(client, uri); |
| Set<TimelineEntity> entities = |
| resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity1") || |
| entity.getId().equals("entity2")); |
| } |
| |
| // metricfilters=HDFS_BYTES_READ lt 60 AND MAP_SLOT_MILLIS gt 40 |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?metricfilters=HDFS_BYTES_READ%20lt%2060%20AND%20" + |
| "MAP_SLOT_MILLIS%20gt%2040"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(0, entities.size()); |
| |
| // metricfilters=(HDFS_BYTES_READ lt 60 AND MAP_SLOT_MILLIS gt 40) OR |
| // (MAP1_SLOT_MILLIS ge 140 AND MAP11_SLOT_MILLIS le 122) |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?metricfilters=(HDFS_BYTES_READ%20lt%2060%20AND%20" + |
| "MAP_SLOT_MILLIS%20gt%2040)%20OR%20(MAP1_SLOT_MILLIS%20ge" + |
| "%20140%20AND%20MAP11_SLOT_MILLIS%20le%20122)"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| int metricCnt = 0; |
| for (TimelineEntity entity : entities) { |
| metricCnt += entity.getMetrics().size(); |
| assertTrue(entity.getId().equals("entity2")); |
| } |
| assertEquals(0, metricCnt); |
| |
| // metricfilters=(HDFS_BYTES_READ lt 60 AND MAP_SLOT_MILLIS gt 40) OR |
| // (MAP1_SLOT_MILLIS ge 140 AND MAP11_SLOT_MILLIS le 122) |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?metricfilters=(HDFS_BYTES_READ%20lt%2060%20AND%20" + |
| "MAP_SLOT_MILLIS%20gt%2040)%20OR%20(MAP1_SLOT_MILLIS%20ge" + |
| "%20140%20AND%20MAP11_SLOT_MILLIS%20le%20122)&fields=METRICS"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| metricCnt = 0; |
| for (TimelineEntity entity : entities) { |
| metricCnt += entity.getMetrics().size(); |
| assertTrue(entity.getId().equals("entity2")); |
| } |
| assertEquals(3, metricCnt); |
| |
| // metricfilters=(HDFS_BYTES_READ lt 60 AND MAP_SLOT_MILLIS gt 40) OR |
| // (MAP1_SLOT_MILLIS ge 140 AND MAP11_SLOT_MILLIS le 122) |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?metricfilters=(HDFS_BYTES_READ%20lt%2060%20AND%20" + |
| "MAP_SLOT_MILLIS%20gt%2040)%20OR%20(MAP1_SLOT_MILLIS%20ge" + |
| "%20140%20AND%20MAP11_SLOT_MILLIS%20le%20122)&metricstoretrieve=" + |
| "!(HDFS)"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| metricCnt = 0; |
| for (TimelineEntity entity : entities) { |
| metricCnt += entity.getMetrics().size(); |
| assertTrue(entity.getId().equals("entity2")); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertTrue(metric.getId().startsWith("MAP1")); |
| assertEquals(TimelineMetric.Type.SINGLE_VALUE, metric.getType()); |
| } |
| } |
| assertEquals(2, metricCnt); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?metricfilters=(HDFS_BYTES_READ%20lt%2060%20AND%20" + |
| "MAP_SLOT_MILLIS%20gt%2040)%20OR%20(MAP1_SLOT_MILLIS%20ge" + |
| "%20140%20AND%20MAP11_SLOT_MILLIS%20le%20122)&metricstoretrieve=" + |
| "!(HDFS)&metricslimit=10"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| metricCnt = 0; |
| for (TimelineEntity entity : entities) { |
| metricCnt += entity.getMetrics().size(); |
| assertTrue(entity.getId().equals("entity2")); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertTrue(metric.getId().startsWith("MAP1")); |
| if (metric.getId().equals("MAP1_SLOT_MILLIS")) { |
| assertEquals(2, metric.getValues().size()); |
| assertEquals(TimelineMetric.Type.TIME_SERIES, metric.getType()); |
| } else if (metric.getId().equals("MAP11_SLOT_MILLIS")) { |
| assertEquals(TimelineMetric.Type.SINGLE_VALUE, metric.getType()); |
| } else { |
| fail("Unexpected metric id"); |
| } |
| } |
| } |
| assertEquals(2, metricCnt); |
| |
| // Test for behavior when compare op is ne(not equals) vs ene |
| // (exists and not equals). MAP11_SLOT_MILLIS does not exist for |
| // entity1. For ne, both entity1 and entity2 will be returned. For ene, |
| // only entity2 will be returned as we are checking for existence too. |
| // metricfilters=MAP11_SLOT_MILLIS ne 100 |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?metricfilters=MAP11_SLOT_MILLIS%20ne%20100"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity1") || |
| entity.getId().equals("entity2")); |
| } |
| // metricfilters=MAP11_SLOT_MILLIS ene 100 |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?metricfilters=MAP11_SLOT_MILLIS%20ene%20100"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity2")); |
| } |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetEntitiesEventFilters() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?eventfilters=event1,event3"); |
| ClientResponse resp = getResponse(client, uri); |
| Set<TimelineEntity> entities = |
| resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity1") || |
| entity.getId().equals("entity2")); |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?eventfilters=!(event1,event3)"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(0, entities.size()); |
| |
| // eventfilters=!(event1,event3) OR event5,event6 |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?eventfilters=!(event1,event3)%20OR%20event5,event6"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity2")); |
| } |
| |
| // eventfilters=(!(event1,event3) OR event5,event6) OR |
| // (event1,event2 AND (event3,event4)) |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?eventfilters=(!(event1,event3)%20OR%20event5," + |
| "event6)%20OR%20(event1,event2%20AND%20(event3,event4))"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity1") || |
| entity.getId().equals("entity2")); |
| } |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetEntitiesRelationFilters() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1?isrelatedto=type3:entity31,type2:entity21:entity22"); |
| ClientResponse resp = getResponse(client, uri); |
| Set<TimelineEntity> entities = |
| resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity1") || |
| entity.getId().equals("entity2")); |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + |
| "clusters/cluster1/apps/application_1111111111_1111/entities/type1" + |
| "?isrelatedto=!(type3:entity31,type2:entity21:entity22)"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(0, entities.size()); |
| |
| // isrelatedto=!(type3:entity31,type2:entity21:entity22)OR type5:entity51, |
| // type6:entity61:entity66 |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + |
| "clusters/cluster1/apps/application_1111111111_1111/entities/type1" + |
| "?isrelatedto=!(type3:entity31,type2:entity21:entity22)%20OR%20" + |
| "type5:entity51,type6:entity61:entity66"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity2")); |
| } |
| |
| // isrelatedto=(!(type3:entity31,type2:entity21:entity22)OR type5: |
| // entity51,type6:entity61:entity66) OR (type1:entity14,type2:entity21: |
| // entity22 AND (type3:entity32:entity35,type4:entity42)) |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + |
| "clusters/cluster1/apps/application_1111111111_1111/entities/type1" + |
| "?isrelatedto=(!(type3:entity31,type2:entity21:entity22)%20OR%20" + |
| "type5:entity51,type6:entity61:entity66)%20OR%20(type1:entity14," + |
| "type2:entity21:entity22%20AND%20(type3:entity32:entity35,"+ |
| "type4:entity42))"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity1") || |
| entity.getId().equals("entity2")); |
| } |
| |
| // relatesto=!(type3:entity31,type2:entity21:entity22)OR type5:entity51, |
| // type6:entity61:entity66 |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + |
| "clusters/cluster1/apps/application_1111111111_1111/entities/type1" + |
| "?relatesto=!%20(type3:entity31,type2:entity21:entity22%20)%20OR%20" + |
| "type5:entity51,type6:entity61:entity66"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity2")); |
| } |
| |
| // relatesto=(!(type3:entity31,type2:entity21:entity22)OR type5:entity51, |
| // type6:entity61:entity66) OR (type1:entity14,type2:entity21:entity22 AND |
| // (type3:entity32:entity35 , type4:entity42)) |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + |
| "clusters/cluster1/apps/application_1111111111_1111/entities/type1" + |
| "?relatesto=(!(%20type3:entity31,type2:entity21:entity22)%20OR%20" + |
| "type5:entity51,type6:entity61:entity66%20)%20OR%20(type1:entity14," + |
| "type2:entity21:entity22%20AND%20(type3:entity32:entity35%20,%20"+ |
| "type4:entity42))"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue(entity.getId().equals("entity1") || |
| entity.getId().equals("entity2")); |
| } |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| /** |
| * Tests if specific configs and metrics are retrieve for getEntity call. |
| */ |
| @Test |
| public void testGetEntityDataToRetrieve() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1/entity2?confstoretrieve=cfg_,configuration_"); |
| ClientResponse resp = getResponse(client, uri); |
| TimelineEntity entity = resp.getEntity(TimelineEntity.class); |
| assertNotNull(entity); |
| assertEquals("entity2", entity.getId()); |
| assertEquals("type1", entity.getType()); |
| assertEquals(2, entity.getConfigs().size()); |
| for (String configKey : entity.getConfigs().keySet()) { |
| assertTrue(configKey.startsWith("configuration_") || |
| configKey.startsWith("cfg_")); |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1/entity2?confstoretrieve=!(cfg_,configuration_)"); |
| resp = getResponse(client, uri); |
| entity = resp.getEntity(TimelineEntity.class); |
| assertNotNull(entity); |
| assertEquals("entity2", entity.getId()); |
| assertEquals("type1", entity.getType()); |
| assertEquals(1, entity.getConfigs().size()); |
| for (String configKey : entity.getConfigs().keySet()) { |
| assertTrue(configKey.startsWith("config_")); |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1/entity2?metricstoretrieve=MAP1_,HDFS_"); |
| resp = getResponse(client, uri); |
| entity = resp.getEntity(TimelineEntity.class); |
| assertNotNull(entity); |
| assertEquals("entity2", entity.getId()); |
| assertEquals("type1", entity.getType()); |
| assertEquals(2, entity.getMetrics().size()); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertTrue(metric.getId().startsWith("MAP1_") || |
| metric.getId().startsWith("HDFS_")); |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1/entity2?metricstoretrieve=!(MAP1_,HDFS_)"); |
| resp = getResponse(client, uri); |
| entity = resp.getEntity(TimelineEntity.class); |
| assertNotNull(entity); |
| assertEquals("entity2", entity.getId()); |
| assertEquals("type1", entity.getType()); |
| assertEquals(1, entity.getMetrics().size()); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertTrue(metric.getId().startsWith("MAP11_")); |
| assertEquals(TimelineMetric.Type.SINGLE_VALUE, metric.getType()); |
| assertEquals(1, metric.getValues().size()); |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1111/" + |
| "entities/type1/entity2?metricstoretrieve=!(MAP1_,HDFS_)&" + |
| "metricslimit=5"); |
| resp = getResponse(client, uri); |
| entity = resp.getEntity(TimelineEntity.class); |
| assertNotNull(entity); |
| assertEquals("entity2", entity.getId()); |
| assertEquals("type1", entity.getType()); |
| assertEquals(1, entity.getMetrics().size()); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertTrue(metric.getId().startsWith("MAP11_")); |
| assertEquals(TimelineMetric.Type.SINGLE_VALUE, metric.getType()); |
| } |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetFlowRunApps() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" + |
| "1002345678919/apps?fields=ALL"); |
| ClientResponse resp = getResponse(client, uri); |
| Set<TimelineEntity> entities = |
| resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue("Unexpected app in result", |
| (entity.getId().equals("application_1111111111_1111") && |
| entity.getMetrics().size() == 3) || |
| (entity.getId().equals("application_1111111111_2222") && |
| entity.getMetrics().size() == 1)); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertEquals(TimelineMetric.Type.SINGLE_VALUE, metric.getType()); |
| assertEquals(1, metric.getValues().size()); |
| } |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" + |
| "1002345678919/apps?fields=ALL&metricslimit=2"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue("Unexpected app in result", |
| (entity.getId().equals("application_1111111111_1111") && |
| entity.getMetrics().size() == 3) || |
| (entity.getId().equals("application_1111111111_2222") && |
| entity.getMetrics().size() == 1)); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| assertTrue(metric.getValues().size() <= 2); |
| assertEquals(TimelineMetric.Type.TIME_SERIES, metric.getType()); |
| } |
| } |
| |
| // Query without specifying cluster ID. |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/users/user1/flows/flow_name/runs/1002345678919/apps"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(2, entities.size()); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/users/user1/flows/flow_name/runs/1002345678919/" + |
| "apps?limit=1"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetFlowApps() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" + |
| "fields=ALL"); |
| ClientResponse resp = getResponse(client, uri); |
| Set<TimelineEntity> entities = |
| resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(3, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue("Unexpected app in result", |
| (entity.getId().equals("application_1111111111_1111") && |
| entity.getConfigs().size() == 1 && |
| entity.getConfigs().equals(ImmutableMap.of("cfg2", "value1"))) || |
| (entity.getId().equals("application_1111111111_2222") && |
| entity.getConfigs().size() == 1 && |
| entity.getConfigs().equals(ImmutableMap.of("cfg1", "value1"))) || |
| (entity.getId().equals("application_1111111111_2224") && |
| entity.getConfigs().size() == 0)); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| if (entity.getId().equals("application_1111111111_1111")) { |
| TimelineMetric m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "HDFS_BYTES_READ", ts - 80000, 57L); |
| TimelineMetric m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "MAP_SLOT_MILLIS", ts - 80000, 40L); |
| TimelineMetric m3 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "MAP1_SLOT_MILLIS", ts - 80000, 40L); |
| assertTrue(verifyMetrics(metric, m1, m2, m3)); |
| } else if (entity.getId().equals("application_1111111111_2222")) { |
| TimelineMetric m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "MAP_SLOT_MILLIS", ts - 80000, 101L); |
| assertTrue(verifyMetrics(metric, m1)); |
| } else if (entity.getId().equals("application_1111111111_2224")) { |
| TimelineMetric m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE, |
| "MAP_SLOT_MILLIS", ts - 80000, 101L); |
| assertTrue(verifyMetrics(metric, m1)); |
| } |
| } |
| } |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" + |
| "fields=ALL&metricslimit=6"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(3, entities.size()); |
| for (TimelineEntity entity : entities) { |
| assertTrue("Unexpected app in result", |
| (entity.getId().equals("application_1111111111_1111") && |
| entity.getConfigs().size() == 1 && |
| entity.getConfigs().equals(ImmutableMap.of("cfg2", "value1"))) || |
| (entity.getId().equals("application_1111111111_2222") && |
| entity.getConfigs().size() == 1 && |
| entity.getConfigs().equals(ImmutableMap.of("cfg1", "value1"))) || |
| (entity.getId().equals("application_1111111111_2224") && |
| entity.getConfigs().size() == 0)); |
| for (TimelineMetric metric : entity.getMetrics()) { |
| if (entity.getId().equals("application_1111111111_1111")) { |
| TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES, |
| "HDFS_BYTES_READ", ts - 80000, 57L); |
| m1.addValue(ts - 100000, 31L); |
| TimelineMetric m2 = newMetric(TimelineMetric.Type.TIME_SERIES, |
| "MAP_SLOT_MILLIS", ts - 80000, 40L); |
| m2.addValue(ts - 100000, 2L); |
| TimelineMetric m3 = newMetric(TimelineMetric.Type.TIME_SERIES, |
| "MAP1_SLOT_MILLIS", ts - 80000, 40L); |
| m3.addValue(ts - 100000, 2L); |
| assertTrue(verifyMetrics(metric, m1, m2, m3)); |
| } else if (entity.getId().equals("application_1111111111_2222")) { |
| TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES, |
| "MAP_SLOT_MILLIS", ts - 80000, 101L); |
| m1.addValue(ts - 100000, 5L); |
| assertTrue(verifyMetrics(metric, m1)); |
| } else if (entity.getId().equals("application_1111111111_2224")) { |
| TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES, |
| "MAP_SLOT_MILLIS", ts - 80000, 101L); |
| m1.addValue(ts - 100000, 5L); |
| assertTrue(verifyMetrics(metric, m1)); |
| } |
| } |
| } |
| |
| // Query without specifying cluster ID. |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/users/user1/flows/flow_name/apps"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(3, entities.size()); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/users/user1/flows/flow_name/apps?limit=1"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetFlowAppsFilters() throws Exception { |
| Client client = createClient(); |
| try { |
| String entityType = TimelineEntityType.YARN_APPLICATION.toString(); |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" + |
| "eventfilters=" + ApplicationMetricsConstants.FINISHED_EVENT_TYPE); |
| ClientResponse resp = getResponse(client, uri); |
| Set<TimelineEntity> entities = |
| resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| assertTrue("Unexpected app in result", entities.contains( |
| newEntity(entityType, "application_1111111111_1111"))); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" + |
| "metricfilters=HDFS_BYTES_READ%20ge%200"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| assertTrue("Unexpected app in result", entities.contains( |
| newEntity(entityType, "application_1111111111_1111"))); |
| |
| uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" + |
| "conffilters=cfg1%20eq%20value1"); |
| resp = getResponse(client, uri); |
| entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertNotNull(entities); |
| assertEquals(1, entities.size()); |
| assertTrue("Unexpected app in result", entities.contains( |
| newEntity(entityType, "application_1111111111_2222"))); |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetFlowRunNotPresent() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" + |
| "1002345678929"); |
| verifyHttpResponse(client, uri, Status.NOT_FOUND); |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetFlowsNotPresent() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster2/flows"); |
| ClientResponse resp = getResponse(client, uri); |
| Set<FlowActivityEntity> entities = |
| resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); |
| assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); |
| assertNotNull(entities); |
| assertEquals(0, entities.size()); |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetAppNotPresent() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster1/apps/application_1111111111_1378"); |
| verifyHttpResponse(client, uri, Status.NOT_FOUND); |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetFlowRunAppsNotPresent() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster2/users/user1/flows/flow_name/runs/" + |
| "1002345678919/apps"); |
| ClientResponse resp = getResponse(client, uri); |
| Set<TimelineEntity> entities = |
| resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); |
| assertNotNull(entities); |
| assertEquals(0, entities.size()); |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @Test |
| public void testGetFlowAppsNotPresent() throws Exception { |
| Client client = createClient(); |
| try { |
| URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + |
| "timeline/clusters/cluster2/users/user1/flows/flow_name55/apps"); |
| ClientResponse resp = getResponse(client, uri); |
| Set<TimelineEntity> entities = |
| resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); |
| assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); |
| assertNotNull(entities); |
| assertEquals(0, entities.size()); |
| } finally { |
| client.destroy(); |
| } |
| } |
| |
| @After |
| public void stop() throws Exception { |
| if (server != null) { |
| server.stop(); |
| server = null; |
| } |
| } |
| } |