YARN-11786. Upgrade hadoop-yarn-server-timelineservice-hbase-tests to Support Trunk Compilation and Remove compatible hadoop version. (#7453)
* Upgrade hadoop-yarn-server-timelineservice-hbase-tests to Support Trunk Compilation and Remove compatible hadoop version.
Co-authored-by: Chris Nauroth <cnauroth@apache.org>
Co-authored-by: Hualong Zhang <hualong.z@hotmail.com>
Reviewed-by: Chris Nauroth <cnauroth@apache.org>
Reviewed-by: Hualong Zhang <hualong.z@hotmail.com>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
diff --git a/BUILDING.txt b/BUILDING.txt
index 191df09..71a7b83 100644
--- a/BUILDING.txt
+++ b/BUILDING.txt
@@ -163,7 +163,7 @@
YARN Application Timeline Service V2 build options:
YARN Timeline Service v.2 chooses Apache HBase as the primary backing storage. The supported
- version of Apache HBase is 2.5.8.
+ version of Apache HBase is 2.6.1.
Snappy build options:
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 5c2b226..1c04652 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -222,7 +222,7 @@
<swagger-annotations-version>1.5.4</swagger-annotations-version>
<snakeyaml.version>2.0</snakeyaml.version>
<sshd.version>2.11.0</sshd.version>
- <hbase.version>2.5.8-hadoop3</hbase.version>
+ <hbase.version>2.6.1-hadoop3</hbase.version>
<junit.version>4.13.2</junit.version>
<junit.jupiter.version>5.8.2</junit.jupiter.version>
<junit.vintage.version>5.8.2</junit.vintage.version>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java
index 513df16..f968476 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java
@@ -19,6 +19,7 @@
import javax.xml.bind.annotation.XmlElement;
+import com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -27,6 +28,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
+@JsonInclude(JsonInclude.Include.NON_NULL)
public class FlowRunEntity extends HierarchicalTimelineEntity {
public static final String USER_INFO_KEY =
TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER";
@@ -107,7 +109,7 @@ public void setRunId(long runId) {
addInfo(FLOW_RUN_ID_INFO_KEY, runId);
}
- public long getStartTime() {
+ public Long getStartTime() {
return getCreatedTime();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/writer/TimelineEntitySetWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/writer/TimelineEntitySetWriter.java
index 3d5978d..81269c2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/writer/TimelineEntitySetWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/writer/TimelineEntitySetWriter.java
@@ -43,11 +43,13 @@
public class TimelineEntitySetWriter implements MessageBodyWriter<Set<TimelineEntity>> {
private ObjectMapper objectMapper = new ObjectMapper();
+ private String timelineEntityType =
+ "java.util.Set<org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity>";
@Override
public boolean isWriteable(Class<?> type, Type genericType,
Annotation[] annotations, MediaType mediaType) {
- return true;
+ return timelineEntityType.equals(genericType.getTypeName());
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
index 025bc3f..97cde1b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
@@ -89,7 +89,6 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
- <version>${hbase-compatible-hadoop.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
@@ -104,10 +103,6 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-json</artifactId>
- </exclusion>
</exclusions>
</dependency>
@@ -131,13 +126,22 @@
<artifactId>junit-platform-launcher</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit.vintage</groupId>
+ <artifactId>junit-vintage-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-server</artifactId>
+ <scope>test</scope>
+ </dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this direct
dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
- <version>${hbase-compatible-hadoop.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
@@ -243,13 +247,6 @@
</dependency>
<dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-client</artifactId>
- <version>1.19.4</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
<version>1.1.1</version>
@@ -369,7 +366,6 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
- <version>${hbase-compatible-hadoop.version}</version>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
@@ -378,10 +374,6 @@
<artifactId>hadoop-auth</artifactId>
</exclusion>
<exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-json</artifactId>
- </exclusion>
- <exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
@@ -393,7 +385,6 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
- <version>${hbase-compatible-hadoop.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
@@ -412,7 +403,6 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
- <version>${hbase-compatible-hadoop.version}</version>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
@@ -430,7 +420,6 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
- <version>${hbase-compatible-hadoop.version}</version>
<scope>test</scope>
</dependency>
@@ -507,6 +496,12 @@
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-json-jettison</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/AbstractTimelineReaderHBaseTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/AbstractTimelineReaderHBaseTestBase.java
index 471fb6c..dbe9546 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/AbstractTimelineReaderHBaseTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/AbstractTimelineReaderHBaseTestBase.java
@@ -22,32 +22,29 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+
import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
import java.net.HttpURLConnection;
import java.net.URI;
-import java.net.URL;
import java.util.List;
+import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.reader.TimelineEntityReader;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest;
-import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.HttpUrlConnectorProvider;
import org.junit.Assert;
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.ClientResponse.Status;
-import com.sun.jersey.api.client.GenericType;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
-import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
-
/**
* Test Base for TimelineReaderServer HBase tests.
*/
@@ -109,19 +106,25 @@ protected void addFilters(Configuration conf) {
}
protected Client createClient() {
- ClientConfig cfg = new DefaultClientConfig();
- cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
- return new Client(
- new URLConnectionClientHandler(new DummyURLConnectionFactory()), cfg);
+ final ClientConfig cc = new ClientConfig();
+ cc.connectorProvider(getHttpURLConnectionFactory());
+ return ClientBuilder.newClient(cc)
+ .register(TimelineEntityReader.class)
+ .register(TimelineEntitySetReader.class)
+ .register(TimelineEntityListReader.class)
+ .register(FlowActivityEntityReader.class)
+ .register(FlowRunEntityReader.class)
+ .register(FlowActivityEntitySetReader.class)
+ .register(FlowActivityEntityListReader.class)
+ .register(FlowRunEntitySetReader.class);
}
- protected ClientResponse getResponse(Client client, URI uri)
+ protected Response getResponse(Client client, URI uri)
throws Exception {
- ClientResponse resp =
- client.resource(uri).accept(MediaType.APPLICATION_JSON)
- .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ Response resp =
+ client.target(uri).request(MediaType.APPLICATION_JSON).get();
if (resp == null || resp.getStatusInfo()
- .getStatusCode() != ClientResponse.Status.OK.getStatusCode()) {
+ .getStatusCode() != HttpURLConnection.HTTP_OK) {
String msg = "";
if (resp != null) {
msg = String.valueOf(resp.getStatusInfo().getStatusCode());
@@ -132,39 +135,37 @@ protected ClientResponse getResponse(Client client, URI uri)
return resp;
}
- protected void verifyHttpResponse(Client client, URI uri, Status status) {
- ClientResponse resp =
- client.resource(uri).accept(MediaType.APPLICATION_JSON)
- .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ protected void verifyHttpResponse(Client client, URI uri, Response.Status status) {
+ Response resp = client.target(uri).request(MediaType.APPLICATION_JSON).get();
assertNotNull(resp);
assertTrue("Response from server should have been " + status,
resp.getStatusInfo().getStatusCode() == status.getStatusCode());
- System.out.println("Response is: " + resp.getEntity(String.class));
+ System.out.println("Response is: " + resp.readEntity(String.class));
}
protected List<FlowActivityEntity> verifyFlowEntites(Client client, URI uri,
int noOfEntities) throws Exception {
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
List<FlowActivityEntity> entities =
- resp.getEntity(new GenericType<List<FlowActivityEntity>>() {
+ resp.readEntity(new GenericType<List<FlowActivityEntity>>() {
});
assertNotNull(entities);
assertEquals(noOfEntities, entities.size());
return entities;
}
- protected static class DummyURLConnectionFactory
- implements HttpURLConnectionFactory {
-
- @Override
- public HttpURLConnection getHttpURLConnection(final URL url)
- throws IOException {
- try {
- return (HttpURLConnection) url.openConnection();
- } catch (UndeclaredThrowableException e) {
- throw new IOException(e.getCause());
- }
- }
+ @VisibleForTesting
+ protected HttpUrlConnectorProvider getHttpURLConnectionFactory() {
+ return new HttpUrlConnectorProvider().connectionFactory(
+ url -> {
+ HttpURLConnection conn;
+ try {
+ conn = (HttpURLConnection) url.openConnection();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return conn;
+ });
}
protected static HBaseTestingUtility getHBaseTestingUtility() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntityListReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntityListReader.java
new file mode 100644
index 0000000..db6eb8c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntityListReader.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.Provider;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * We have defined a dedicated Reader for `List<FlowActivityEntity>`,
+ * aimed at adapting to the Jersey2 framework
+ * to ensure that JSON can be converted into `List<FlowActivityEntity>`.
+ */
+@Provider
+@Consumes(MediaType.APPLICATION_JSON)
+public class FlowActivityEntityListReader implements MessageBodyReader<List<FlowActivityEntity>> {
+
+ private ObjectMapper objectMapper = new ObjectMapper();
+ private String timelineEntityType =
+ "java.util.List<org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity>";
+
+ @Override
+ public boolean isReadable(Class<?> type, Type genericType,
+ Annotation[] annotations, MediaType mediaType) {
+ return timelineEntityType.equals(genericType.getTypeName());
+ }
+
+ @Override
+ public List<FlowActivityEntity> readFrom(Class<List<FlowActivityEntity>> type,
+ Type genericType, Annotation[] annotations, MediaType mediaType,
+ MultivaluedMap<String, String> httpHeaders,
+ InputStream entityStream) throws IOException, WebApplicationException {
+ List<FlowActivityEntity> flowActivityEntityList = new ArrayList<>();
+
+ JsonNode jsonNode = objectMapper.readTree(entityStream);
+ if (jsonNode.isArray()) {
+ for (JsonNode jNode : jsonNode) {
+ FlowActivityEntity entity = new FlowActivityEntity();
+
+ // Get Identifier
+ JsonNode jnIdentifier = jNode.get("identifier");
+ JsonNode jnType = jnIdentifier.get("type");
+ JsonNode jnId = jnIdentifier.get("id");
+ TimelineEntity.Identifier identifier =
+ new TimelineEntity.Identifier(jnType.asText(), jnId.asText());
+ entity.setIdentifier(identifier);
+
+ // Get Type
+ JsonNode jnAppType = jNode.get("type");
+ entity.setType(jnAppType.asText());
+
+ // Get Createdtime
+ JsonNode jnCreatedTime = jNode.get("createdtime");
+ entity.setCreatedTime(jnCreatedTime.asLong());
+
+ // Get configs
+ JsonNode jnConfigs = jNode.get("configs");
+ if (jnConfigs != null) {
+ Map<String, String> configInfos =
+ objectMapper.treeToValue(jnConfigs, Map.class);
+ entity.setConfigs(configInfos);
+ }
+
+ // Get info
+ JsonNode jnInfos = jNode.get("info");
+ if (jnInfos != null) {
+ Map<String, Object> entityInfos =
+ objectMapper.treeToValue(jnInfos, Map.class);
+ entity.setInfo(entityInfos);
+ }
+
+ // Get BasicInfo
+ entity.setDate(jNode.get("date").asLong());
+ entity.setCluster(jNode.get("cluster").asText());
+ entity.setUser(jNode.get("user").asText());
+ entity.setFlowName(jNode.get("flowName").asText());
+
+ // Get flowRuns
+ JsonNode jnflowRuns = jNode.get("flowRuns");
+ if (jnflowRuns != null) {
+ for (JsonNode jnflow : jnflowRuns) {
+ FlowRunEntity flowRunEntity = objectMapper.treeToValue(jnflow, FlowRunEntity.class);
+ entity.addFlowRun(flowRunEntity);
+ }
+ }
+ flowActivityEntityList.add(entity);
+ }
+ }
+
+ return flowActivityEntityList;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntityReader.java
new file mode 100644
index 0000000..c85f701
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntityReader.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.Provider;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+
+/**
+ * We have defined a dedicated Reader for FlowActivityEntity,
+ * aimed at adapting to the Jersey2 framework
+ * to ensure that JSON can be converted into FlowActivityEntity.
+ */
+@Provider
+@Consumes(MediaType.APPLICATION_JSON)
+public class FlowActivityEntityReader implements MessageBodyReader<FlowActivityEntity> {
+
+ private ObjectMapper objectMapper = new ObjectMapper();
+
+ @Override
+ public boolean isReadable(Class<?> type, Type genericType,
+ Annotation[] annotations, MediaType mediaType) {
+ return type == FlowActivityEntity.class;
+ }
+
+ @Override
+ public FlowActivityEntity readFrom(Class<FlowActivityEntity> type, Type genericType,
+ Annotation[] annotations, MediaType mediaType, MultivaluedMap<String, String> httpHeaders,
+ InputStream entityStream) throws IOException, WebApplicationException {
+ try {
+ FlowActivityEntity timelineEntity =
+ objectMapper.readValue(entityStream, FlowActivityEntity.class);
+ return timelineEntity;
+ } catch (Exception e) {
+ return new FlowActivityEntity();
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntitySetReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntitySetReader.java
new file mode 100644
index 0000000..06a96da
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntitySetReader.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.Provider;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * We have defined a dedicated Reader for `Set<FlowActivityEntity>`,
+ * aimed at adapting to the Jersey2 framework
+ * to ensure that JSON can be converted into `Set<FlowActivityEntity>`.
+ */
+@Provider
+@Consumes(MediaType.APPLICATION_JSON)
+public class FlowActivityEntitySetReader implements MessageBodyReader<Set<FlowActivityEntity>> {
+
+ private ObjectMapper objectMapper = new ObjectMapper();
+ private String timelineEntityType =
+ "java.util.Set<org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity>";
+
+ @Override
+ public boolean isReadable(Class<?> type, Type genericType,
+ Annotation[] annotations, MediaType mediaType) {
+ return timelineEntityType.equals(genericType.getTypeName());
+ }
+
+ @Override
+ public Set<FlowActivityEntity> readFrom(Class<Set<FlowActivityEntity>> type,
+ Type genericType, Annotation[] annotations, MediaType mediaType,
+ MultivaluedMap<String, String> httpHeaders,
+ InputStream entityStream) throws IOException, WebApplicationException {
+ Set<FlowActivityEntity> flowActivityEntitySet = new HashSet<>();
+
+ JsonNode jsonNode = objectMapper.readTree(entityStream);
+ if (jsonNode.isArray()) {
+ for (JsonNode jNode : jsonNode) {
+ FlowActivityEntity entity = new FlowActivityEntity();
+
+ // Get Identifier
+ JsonNode jnIdentifier = jNode.get("identifier");
+ JsonNode jnType = jnIdentifier.get("type");
+ JsonNode jnId = jnIdentifier.get("id");
+ TimelineEntity.Identifier identifier =
+ new TimelineEntity.Identifier(jnType.asText(), jnId.asText());
+ entity.setIdentifier(identifier);
+
+ // Get Type
+ JsonNode jnAppType = jNode.get("type");
+ entity.setType(jnAppType.asText());
+
+ // Get Createdtime
+ JsonNode jnCreatedTime = jNode.get("createdtime");
+ entity.setCreatedTime(jnCreatedTime.asLong());
+
+ // Get configs
+ JsonNode jnConfigs = jNode.get("configs");
+ if (jnConfigs != null) {
+ Map<String, String> configInfos =
+ objectMapper.treeToValue(jnConfigs, Map.class);
+ entity.setConfigs(configInfos);
+ }
+
+ // Get info
+ JsonNode jnInfos = jNode.get("info");
+ if (jnInfos != null) {
+ Map<String, Object> entityInfos =
+ objectMapper.treeToValue(jnInfos, Map.class);
+ entity.setInfo(entityInfos);
+ }
+
+ // Get BasicInfo
+ entity.setDate(jNode.get("date").asLong());
+ entity.setCluster(jNode.get("cluster").asText());
+ entity.setUser(jNode.get("user").asText());
+ entity.setFlowName(jNode.get("flowName").asText());
+
+ // Get flowRuns
+ JsonNode jnflowRuns = jNode.get("flowRuns");
+ if (jnflowRuns != null) {
+ for (JsonNode jnflow : jnflowRuns) {
+ FlowRunEntity flowRunEntity = objectMapper.treeToValue(jnflow, FlowRunEntity.class);
+ entity.addFlowRun(flowRunEntity);
+ }
+ }
+
+ flowActivityEntitySet.add(entity);
+ }
+ }
+
+ return flowActivityEntitySet;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowRunEntityReader.java
new file mode 100644
index 0000000..87453fd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowRunEntityReader.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.Provider;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+
+/**
+ * We have defined a dedicated Reader for FlowRunEntity,
+ * aimed at adapting to the Jersey2 framework
+ * to ensure that JSON can be converted into FlowRunEntity.
+ */
+@Provider
+@Consumes(MediaType.APPLICATION_JSON)
+public class FlowRunEntityReader implements MessageBodyReader<FlowRunEntity> {
+
+ private ObjectMapper objectMapper = new ObjectMapper();
+
+ @Override
+ public boolean isReadable(Class<?> type, Type genericType,
+ Annotation[] annotations, MediaType mediaType) {
+ return type == FlowRunEntity.class;
+ }
+
+ @Override
+ public FlowRunEntity readFrom(Class<FlowRunEntity> type, Type genericType,
+ Annotation[] annotations, MediaType mediaType, MultivaluedMap<String, String> httpHeaders,
+ InputStream entityStream) throws IOException, WebApplicationException {
+ try {
+ FlowRunEntity timelineEntity = objectMapper.readValue(entityStream, FlowRunEntity.class);
+ return timelineEntity;
+ } catch (Exception e) {
+ return new FlowRunEntity();
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowRunEntitySetReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowRunEntitySetReader.java
new file mode 100644
index 0000000..4598a27
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowRunEntitySetReader.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.Provider;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * We have defined a dedicated Reader for `Set<FlowActivityEntity>`,
+ * aimed at adapting to the Jersey2 framework
+ * to ensure that JSON can be converted into `Set<FlowActivityEntity>`.
+ */
+@Provider
+@Consumes(MediaType.APPLICATION_JSON)
+public class FlowRunEntitySetReader implements MessageBodyReader<Set<FlowRunEntity>> {
+
+ private ObjectMapper objectMapper = new ObjectMapper();
+ private String timelineEntityType =
+ "java.util.Set<org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity>";
+
+ @Override
+ public boolean isReadable(Class<?> type, Type genericType,
+ Annotation[] annotations, MediaType mediaType) {
+ return timelineEntityType.equals(genericType.getTypeName());
+ }
+
+ @Override
+ public Set<FlowRunEntity> readFrom(Class<Set<FlowRunEntity>> type,
+ Type genericType, Annotation[] annotations, MediaType mediaType,
+ MultivaluedMap<String, String> httpHeaders,
+ InputStream entityStream) throws IOException, WebApplicationException {
+ Set<FlowRunEntity> flowRunEntitySet = new HashSet<>();
+
+ JsonNode jsonNode = objectMapper.readTree(entityStream);
+ if (jsonNode.isArray()) {
+ for (JsonNode jNode : jsonNode) {
+ FlowRunEntity flowRunEntity = objectMapper.treeToValue(jNode, FlowRunEntity.class);
+ flowRunEntitySet.add(flowRunEntity);
+ }
+ }
+
+ return flowRunEntitySet;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
index 2a55e0e..e548853 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
@@ -23,6 +23,8 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import javax.ws.rs.client.Client;
+
import java.io.IOException;
import java.net.URI;
import java.text.DateFormat;
@@ -35,7 +37,9 @@
import java.util.Map;
import java.util.Set;
+import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
@@ -60,10 +64,6 @@
import org.junit.Test;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.ClientResponse.Status;
-import com.sun.jersey.api.client.GenericType;
/**
* Test TimelineReder Web Service REST API's.
@@ -452,10 +452,10 @@ public void testGetFlowRun() throws Exception {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
"1002345678919");
- ClientResponse resp = getResponse(client, uri);
- FlowRunEntity entity = resp.getEntity(FlowRunEntity.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8",
- resp.getType().toString());
+ Response resp = getResponse(client, uri);
+ FlowRunEntity entity = resp.readEntity(FlowRunEntity.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8",
+ resp.getMediaType().toString());
assertNotNull(entity);
assertEquals("user1@flow_name/1002345678919", entity.getId());
assertEquals(3, entity.getMetrics().size());
@@ -473,7 +473,7 @@ public void testGetFlowRun() throws Exception {
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/users/user1/flows/flow_name/runs/1002345678919");
resp = getResponse(client, uri);
- entity = resp.getEntity(FlowRunEntity.class);
+ entity = resp.readEntity(FlowRunEntity.class);
assertNotNull(entity);
assertEquals("user1@flow_name/1002345678919", entity.getId());
assertEquals(3, entity.getMetrics().size());
@@ -487,7 +487,7 @@ public void testGetFlowRun() throws Exception {
assertTrue(verifyMetrics(metric, m1, m2, m3));
}
} finally {
- client.destroy();
+ client.close();
}
}
@@ -497,11 +497,11 @@ public void testGetFlowRuns() throws Exception {
try {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs");
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<FlowRunEntity> entities =
- resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8",
- resp.getType().toString());
+ resp.readEntity(new GenericType<Set<FlowRunEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8",
+ resp.getMediaType().toString());
assertNotNull(entities);
assertEquals(2, entities.size());
for (FlowRunEntity entity : entities) {
@@ -519,9 +519,9 @@ public void testGetFlowRuns() throws Exception {
URI.create("http://localhost:" + getServerPort() + "/ws/v2/timeline/"
+ "clusters/cluster1/users/user1/flows/flow_name/runs?limit=1");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8",
- resp.getType().toString());
+ entities = resp.readEntity(new GenericType<Set<FlowRunEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8",
+ resp.getMediaType().toString());
assertNotNull(entities);
assertEquals(1, entities.size());
for (FlowRunEntity entity : entities) {
@@ -536,9 +536,9 @@ public void testGetFlowRuns() throws Exception {
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
"createdtimestart=1425016501030");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8",
- resp.getType().toString());
+ entities = resp.readEntity(new GenericType<Set<FlowRunEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8",
+ resp.getMediaType().toString());
assertNotNull(entities);
assertEquals(1, entities.size());
for (FlowRunEntity entity : entities) {
@@ -553,9 +553,9 @@ public void testGetFlowRuns() throws Exception {
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
"createdtimestart=1425016500999&createdtimeend=1425016501035");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8",
- resp.getType().toString());
+ entities = resp.readEntity(new GenericType<Set<FlowRunEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8",
+ resp.getMediaType().toString());
assertNotNull(entities);
assertEquals(2, entities.size());
for (FlowRunEntity entity : entities) {
@@ -573,9 +573,9 @@ public void testGetFlowRuns() throws Exception {
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
"createdtimeend=1425016501030");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8",
- resp.getType().toString());
+ entities = resp.readEntity(new GenericType<Set<FlowRunEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8",
+ resp.getMediaType().toString());
assertNotNull(entities);
assertEquals(1, entities.size());
for (FlowRunEntity entity : entities) {
@@ -590,9 +590,9 @@ public void testGetFlowRuns() throws Exception {
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
"fields=metrics");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8",
- resp.getType().toString());
+ entities = resp.readEntity(new GenericType<Set<FlowRunEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8",
+ resp.getMediaType().toString());
assertNotNull(entities);
assertEquals(2, entities.size());
for (FlowRunEntity entity : entities) {
@@ -612,9 +612,9 @@ public void testGetFlowRuns() throws Exception {
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
"fields=CONFIGS");
- verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST);
} finally {
- client.destroy();
+ client.close();
}
}
@@ -625,11 +625,11 @@ public void testGetFlowRunsMetricsToRetrieve() throws Exception {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
"metricstoretrieve=MAP_,HDFS_");
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<FlowRunEntity> entities =
- resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8",
- resp.getType().toString());
+ resp.readEntity(new GenericType<Set<FlowRunEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8",
+ resp.getMediaType().toString());
assertNotNull(entities);
assertEquals(2, entities.size());
int metricCnt = 0;
@@ -646,9 +646,9 @@ public void testGetFlowRunsMetricsToRetrieve() throws Exception {
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
"metricstoretrieve=!(MAP_,HDFS_)");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8",
- resp.getType().toString());
+ entities = resp.readEntity(new GenericType<Set<FlowRunEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8",
+ resp.getMediaType().toString());
assertNotNull(entities);
assertEquals(2, entities.size());
metricCnt = 0;
@@ -660,7 +660,7 @@ public void testGetFlowRunsMetricsToRetrieve() throws Exception {
}
assertEquals(1, metricCnt);
} finally {
- client.destroy();
+ client.close();
}
}
@@ -670,10 +670,10 @@ public void testGetEntitiesByUID() throws Exception {
try {
// Query all flows.
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
- "timeline/flows");
- ClientResponse resp = getResponse(client, uri);
+ "timeline/flows/");
+ Response resp = getResponse(client, uri);
Set<FlowActivityEntity> flowEntities =
- resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+ resp.readEntity(new GenericType<Set<FlowActivityEntity>>(){});
assertNotNull(flowEntities);
assertEquals(3, flowEntities.size());
List<String> listFlowUIDs = new ArrayList<String>();
@@ -699,7 +699,7 @@ public void testGetEntitiesByUID() throws Exception {
"timeline/flow-uid/" + flowUID + "/runs");
resp = getResponse(client, uri);
Set<FlowRunEntity> frEntities =
- resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
+ resp.readEntity(new GenericType<Set<FlowRunEntity>>(){});
assertNotNull(frEntities);
for (FlowRunEntity entity : frEntities) {
String flowRunUID =
@@ -718,7 +718,7 @@ public void testGetEntitiesByUID() throws Exception {
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/run-uid/" + flowRunUID);
resp = getResponse(client, uri);
- FlowRunEntity entity = resp.getEntity(FlowRunEntity.class);
+ FlowRunEntity entity = resp.readEntity(FlowRunEntity.class);
assertNotNull(entity);
}
@@ -731,7 +731,7 @@ public void testGetEntitiesByUID() throws Exception {
"timeline/run-uid/" + flowRunUID + "/apps");
resp = getResponse(client, uri);
Set<TimelineEntity> appEntities =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(appEntities);
for (TimelineEntity entity : appEntities) {
String appUID =
@@ -750,7 +750,7 @@ public void testGetEntitiesByUID() throws Exception {
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/app-uid/" + appUID);
resp = getResponse(client, uri);
- TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+ TimelineEntity entity = resp.readEntity(TimelineEntity.class);
assertNotNull(entity);
}
@@ -764,7 +764,7 @@ public void testGetEntitiesByUID() throws Exception {
"timeline/app-uid/" + appUID + "/entities/type1");
resp = getResponse(client, uri);
Set<TimelineEntity> entities =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
for (TimelineEntity entity : entities) {
String entityUID =
@@ -785,40 +785,40 @@ public void testGetEntitiesByUID() throws Exception {
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/entity-uid/" + entityUID);
resp = getResponse(client, uri);
- TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+ TimelineEntity entity = resp.readEntity(TimelineEntity.class);
assertNotNull(entity);
}
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/flow-uid/dummy:flow/runs");
- verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/run-uid/dummy:flowrun");
- verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST);
// Run Id is not a numerical value.
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/run-uid/some:dummy:flow:123v456");
- verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/run-uid/dummy:flowrun/apps");
- verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/app-uid/dummy:app");
- verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/app-uid/dummy:app/entities/type1");
- verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/entity-uid/dummy:entity");
- verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST);
} finally {
- client.destroy();
+ client.close();
}
}
@@ -830,8 +830,8 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception {
"cluster1!user1!flow_name!1002345678919!application_1111111111_1111";
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/"+
"timeline/app-uid/" + appUIDWithFlowInfo);
- ClientResponse resp = getResponse(client, uri);
- TimelineEntity appEntity1 = resp.getEntity(TimelineEntity.class);
+ Response resp = getResponse(client, uri);
+ TimelineEntity appEntity1 = resp.readEntity(TimelineEntity.class);
assertNotNull(appEntity1);
assertEquals(
TimelineEntityType.YARN_APPLICATION.toString(), appEntity1.getType());
@@ -842,7 +842,7 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception {
+ "app-uid/" + appUIDWithFlowInfo + "/entities/type1");
resp = getResponse(client, uri);
Set<TimelineEntity> entities1 =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities1);
assertEquals(2, entities1.size());
for (TimelineEntity entity : entities1) {
@@ -859,7 +859,7 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception {
uri = URI.create("http://localhost:" + getServerPort()
+ "/ws/v2/timeline/" + "app-uid/" + appUIDWithoutFlowInfo);
resp = getResponse(client, uri);
- TimelineEntity appEntity2 = resp.getEntity(TimelineEntity.class);
+ TimelineEntity appEntity2 = resp.readEntity(TimelineEntity.class);
assertNotNull(appEntity2);
assertEquals(
TimelineEntityType.YARN_APPLICATION.toString(), appEntity2.getType());
@@ -870,7 +870,7 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception {
+ "app-uid/" + appUIDWithoutFlowInfo + "/entities/type1");
resp = getResponse(client, uri);
Set<TimelineEntity> entities2 =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities2);
assertEquals(2, entities2.size());
for (TimelineEntity entity : entities2) {
@@ -887,7 +887,7 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception {
uri = URI.create("http://localhost:" + getServerPort()
+ "/ws/v2/timeline/" + "entity-uid/" + entityUIDWithFlowInfo);
resp = getResponse(client, uri);
- TimelineEntity singleEntity1 = resp.getEntity(TimelineEntity.class);
+ TimelineEntity singleEntity1 = resp.readEntity(TimelineEntity.class);
assertNotNull(singleEntity1);
assertEquals("type1", singleEntity1.getType());
assertEquals("entity1", singleEntity1.getId());
@@ -897,12 +897,12 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception {
uri = URI.create("http://localhost:" + getServerPort()
+ "/ws/v2/timeline/" + "entity-uid/" + entityUIDWithoutFlowInfo);
resp = getResponse(client, uri);
- TimelineEntity singleEntity2 = resp.getEntity(TimelineEntity.class);
+ TimelineEntity singleEntity2 = resp.readEntity(TimelineEntity.class);
assertNotNull(singleEntity2);
assertEquals("type1", singleEntity2.getType());
assertEquals("entity1", singleEntity2.getId());
} finally {
- client.destroy();
+ client.close();
}
}
@@ -914,9 +914,9 @@ public void testUIDNotProperlyEscaped() throws Exception {
"cluster1!user*1!flow_name!1002345678919!application_1111111111_1111";
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/"+
"timeline/app-uid/" + appUID);
- verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST);
} finally {
- client.destroy();
+ client.close();
}
}
@@ -971,21 +971,21 @@ public void testGetFlows() throws Exception {
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/flows?daterange=20150711:20150714");
- verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/flows?daterange=20150714-20150711");
- verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/flows?daterange=2015071129-20150712");
- verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/flows?daterange=20150711-2015071243");
- verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST);
} finally {
- client.destroy();
+ client.close();
}
}
@@ -1022,7 +1022,7 @@ public void testGetFlowsForPagination() throws Exception {
flowEntites = verifyFlowEntites(client, uri, 1);
assertEquals(fEntity3, flowEntites.get(0));
} finally {
- client.destroy();
+ client.close();
}
}
@@ -1033,8 +1033,8 @@ public void testGetApp() throws Exception {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111?" +
"userid=user1&fields=ALL&flowname=flow_name&flowrunid=1002345678919");
- ClientResponse resp = getResponse(client, uri);
- TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+ Response resp = getResponse(client, uri);
+ TimelineEntity entity = resp.readEntity(TimelineEntity.class);
assertNotNull(entity);
assertEquals("application_1111111111_1111", entity.getId());
assertEquals(3, entity.getMetrics().size());
@@ -1052,7 +1052,7 @@ public void testGetApp() throws Exception {
"timeline/apps/application_1111111111_2222?userid=user1" +
"&fields=metrics&flowname=flow_name&flowrunid=1002345678919");
resp = getResponse(client, uri);
- entity = resp.getEntity(TimelineEntity.class);
+ entity = resp.readEntity(TimelineEntity.class);
assertNotNull(entity);
assertEquals("application_1111111111_2222", entity.getId());
assertEquals(1, entity.getMetrics().size());
@@ -1062,7 +1062,7 @@ public void testGetApp() throws Exception {
assertTrue(verifyMetrics(metric, m4));
}
} finally {
- client.destroy();
+ client.close();
}
}
@@ -1073,8 +1073,8 @@ public void testGetAppWithoutFlowInfo() throws Exception {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111?" +
"fields=ALL");
- ClientResponse resp = getResponse(client, uri);
- TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+ Response resp = getResponse(client, uri);
+ TimelineEntity entity = resp.readEntity(TimelineEntity.class);
assertNotNull(entity);
assertEquals("application_1111111111_1111", entity.getId());
assertEquals(1, entity.getConfigs().size());
@@ -1093,7 +1093,7 @@ public void testGetAppWithoutFlowInfo() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111?" +
"fields=ALL&metricslimit=10");
resp = getResponse(client, uri);
- entity = resp.getEntity(TimelineEntity.class);
+ entity = resp.readEntity(TimelineEntity.class);
assertNotNull(entity);
assertEquals("application_1111111111_1111", entity.getId());
assertEquals(1, entity.getConfigs().size());
@@ -1111,7 +1111,7 @@ public void testGetAppWithoutFlowInfo() throws Exception {
assertTrue(verifyMetrics(metric, m1, m2, m3));
}
} finally {
- client.destroy();
+ client.close();
}
}
@@ -1122,13 +1122,13 @@ public void testGetEntityWithoutFlowInfo() throws Exception {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1/entity1");
- ClientResponse resp = getResponse(client, uri);
- TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+ Response resp = getResponse(client, uri);
+ TimelineEntity entity = resp.readEntity(TimelineEntity.class);
assertNotNull(entity);
assertEquals("entity1", entity.getId());
assertEquals("type1", entity.getType());
} finally {
- client.destroy();
+ client.close();
}
}
@@ -1139,9 +1139,9 @@ public void testGetEntitiesWithoutFlowInfo() throws Exception {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1");
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<TimelineEntity> entities =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (TimelineEntity entity : entities) {
@@ -1149,7 +1149,7 @@ public void testGetEntitiesWithoutFlowInfo() throws Exception {
entity.getId().equals("entity2"));
}
} finally {
- client.destroy();
+ client.close();
}
}
@@ -1163,9 +1163,9 @@ public void testGetEntitiesDataToRetrieve() throws Exception {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?confstoretrieve=cfg_");
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<TimelineEntity> entities =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
int cfgCnt = 0;
@@ -1181,7 +1181,7 @@ public void testGetEntitiesDataToRetrieve() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?confstoretrieve=cfg_,config_");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
cfgCnt = 0;
@@ -1198,7 +1198,7 @@ public void testGetEntitiesDataToRetrieve() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?confstoretrieve=!(cfg_,config_)");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
cfgCnt = 0;
@@ -1214,7 +1214,7 @@ public void testGetEntitiesDataToRetrieve() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?metricstoretrieve=MAP_");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
int metricCnt = 0;
@@ -1230,7 +1230,7 @@ public void testGetEntitiesDataToRetrieve() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?metricstoretrieve=MAP1_,HDFS_");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
metricCnt = 0;
@@ -1247,7 +1247,7 @@ public void testGetEntitiesDataToRetrieve() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?metricstoretrieve=!(MAP1_,HDFS_)");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
metricCnt = 0;
@@ -1260,7 +1260,7 @@ public void testGetEntitiesDataToRetrieve() throws Exception {
}
assertEquals(2, metricCnt);
} finally {
- client.destroy();
+ client.close();
}
}
@@ -1272,9 +1272,9 @@ public void testGetEntitiesConfigFilters() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?conffilters=config_param1%20eq%20value1%20OR%20" +
"config_param1%20eq%20value3");
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<TimelineEntity> entities =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (TimelineEntity entity : entities) {
@@ -1287,7 +1287,7 @@ public void testGetEntitiesConfigFilters() throws Exception {
"entities/type1?conffilters=config_param1%20eq%20value1%20AND" +
"%20configuration_param2%20eq%20value2");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(0, entities.size());
@@ -1299,7 +1299,7 @@ public void testGetEntitiesConfigFilters() throws Exception {
"%20configuration_param2%20eq%20value2)%20OR%20(config_param1%20eq" +
"%20value3%20AND%20cfg_param3%20eq%20value1)");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
int cfgCnt = 0;
@@ -1317,7 +1317,7 @@ public void testGetEntitiesConfigFilters() throws Exception {
"%20configuration_param2%20eq%20value2)%20OR%20(config_param1%20eq" +
"%20value3%20AND%20cfg_param3%20eq%20value1)&fields=CONFIGS");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
cfgCnt = 0;
@@ -1334,7 +1334,7 @@ public void testGetEntitiesConfigFilters() throws Exception {
"%20value3%20AND%20cfg_param3%20eq%20value1)&confstoretrieve=cfg_," +
"configuration_");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
cfgCnt = 0;
@@ -1357,7 +1357,7 @@ public void testGetEntitiesConfigFilters() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?conffilters=configuration_param2%20ne%20value3");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (TimelineEntity entity : entities) {
@@ -1369,14 +1369,14 @@ public void testGetEntitiesConfigFilters() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?conffilters=configuration_param2%20ene%20value3");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
for (TimelineEntity entity : entities) {
assertEquals("entity2", entity.getId());
}
} finally {
- client.destroy();
+ client.close();
}
}
@@ -1389,9 +1389,9 @@ public void testGetEntitiesInfoFilters() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?infofilters=info1%20eq%20cluster1%20OR%20info1%20eq" +
"%20cluster2");
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<TimelineEntity> entities =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (TimelineEntity entity : entities) {
@@ -1405,7 +1405,7 @@ public void testGetEntitiesInfoFilters() throws Exception {
"entities/type1?infofilters=info1%20eq%20cluster1%20AND%20info4%20" +
"eq%2035000");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(0, entities.size());
@@ -1415,7 +1415,7 @@ public void testGetEntitiesInfoFilters() throws Exception {
"entities/type1?infofilters=info4%20eq%2035000%20OR%20info4%20eq" +
"%2036000");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (TimelineEntity entity : entities) {
@@ -1431,7 +1431,7 @@ public void testGetEntitiesInfoFilters() throws Exception {
"eq%2035000)%20OR%20(info1%20eq%20cluster2%20AND%20info2%20eq%202.0" +
")");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
int infoCnt = 0;
@@ -1451,7 +1451,7 @@ public void testGetEntitiesInfoFilters() throws Exception {
"eq%2035000)%20OR%20(info1%20eq%20cluster2%20AND%20info2%20eq%20" +
"2.0)&fields=INFO");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
infoCnt = 0;
@@ -1471,7 +1471,7 @@ public void testGetEntitiesInfoFilters() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?infofilters=info3%20ne%2039000");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (TimelineEntity entity : entities) {
@@ -1483,14 +1483,14 @@ public void testGetEntitiesInfoFilters() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?infofilters=info3%20ene%2039000");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
for (TimelineEntity entity : entities) {
assertEquals("entity1", entity.getId());
}
} finally {
- client.destroy();
+ client.close();
}
}
@@ -1503,9 +1503,9 @@ public void testGetEntitiesMetricFilters() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?metricfilters=HDFS_BYTES_READ%20lt%2060%20OR%20" +
"HDFS_BYTES_READ%20eq%20157");
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<TimelineEntity> entities =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (TimelineEntity entity : entities) {
@@ -1519,7 +1519,7 @@ public void testGetEntitiesMetricFilters() throws Exception {
"entities/type1?metricfilters=HDFS_BYTES_READ%20lt%2060%20AND%20" +
"MAP_SLOT_MILLIS%20gt%2040");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(0, entities.size());
@@ -1531,7 +1531,7 @@ public void testGetEntitiesMetricFilters() throws Exception {
"MAP_SLOT_MILLIS%20gt%2040)%20OR%20(MAP1_SLOT_MILLIS%20ge" +
"%20140%20AND%20MAP11_SLOT_MILLIS%20le%20122)");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
int metricCnt = 0;
@@ -1549,7 +1549,7 @@ public void testGetEntitiesMetricFilters() throws Exception {
"MAP_SLOT_MILLIS%20gt%2040)%20OR%20(MAP1_SLOT_MILLIS%20ge" +
"%20140%20AND%20MAP11_SLOT_MILLIS%20le%20122)&fields=METRICS");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
metricCnt = 0;
@@ -1568,7 +1568,7 @@ public void testGetEntitiesMetricFilters() throws Exception {
"%20140%20AND%20MAP11_SLOT_MILLIS%20le%20122)&metricstoretrieve=" +
"!(HDFS)");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
metricCnt = 0;
@@ -1589,7 +1589,7 @@ public void testGetEntitiesMetricFilters() throws Exception {
"%20140%20AND%20MAP11_SLOT_MILLIS%20le%20122)&metricstoretrieve=" +
"!(HDFS)&metricslimit=10");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
metricCnt = 0;
@@ -1619,7 +1619,7 @@ public void testGetEntitiesMetricFilters() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?metricfilters=MAP11_SLOT_MILLIS%20ne%20100");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (TimelineEntity entity : entities) {
@@ -1631,14 +1631,14 @@ public void testGetEntitiesMetricFilters() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?metricfilters=MAP11_SLOT_MILLIS%20ene%20100");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
for (TimelineEntity entity : entities) {
assertEquals("entity2", entity.getId());
}
} finally {
- client.destroy();
+ client.close();
}
}
@@ -1649,9 +1649,9 @@ public void testGetEntitiesEventFilters() throws Exception {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?eventfilters=event1,event3");
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<TimelineEntity> entities =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (TimelineEntity entity : entities) {
@@ -1663,7 +1663,7 @@ public void testGetEntitiesEventFilters() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?eventfilters=!(event1,event3)");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(0, entities.size());
@@ -1672,7 +1672,7 @@ public void testGetEntitiesEventFilters() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?eventfilters=!(event1,event3)%20OR%20event5,event6");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
for (TimelineEntity entity : entities) {
@@ -1686,7 +1686,7 @@ public void testGetEntitiesEventFilters() throws Exception {
"entities/type1?eventfilters=(!(event1,event3)%20OR%20event5," +
"event6)%20OR%20(event1,event2%20AND%20(event3,event4))");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (TimelineEntity entity : entities) {
@@ -1694,7 +1694,7 @@ public void testGetEntitiesEventFilters() throws Exception {
entity.getId().equals("entity2"));
}
} finally {
- client.destroy();
+ client.close();
}
}
@@ -1705,9 +1705,9 @@ public void testGetEntitiesRelationFilters() throws Exception {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?isrelatedto=type3:entity31,type2:entity21:entity22");
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<TimelineEntity> entities =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (TimelineEntity entity : entities) {
@@ -1720,7 +1720,7 @@ public void testGetEntitiesRelationFilters() throws Exception {
"clusters/cluster1/apps/application_1111111111_1111/entities/type1" +
"?isrelatedto=!(type3:entity31,type2:entity21:entity22)");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(0, entities.size());
@@ -1732,7 +1732,7 @@ public void testGetEntitiesRelationFilters() throws Exception {
"?isrelatedto=!(type3:entity31,type2:entity21:entity22)%20OR%20" +
"type5:entity51,type6:entity61:entity66");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
for (TimelineEntity entity : entities) {
@@ -1750,7 +1750,7 @@ public void testGetEntitiesRelationFilters() throws Exception {
"type2:entity21:entity22%20AND%20(type3:entity32:entity35,"+
"type4:entity42))");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (TimelineEntity entity : entities) {
@@ -1766,7 +1766,7 @@ public void testGetEntitiesRelationFilters() throws Exception {
"?relatesto=!%20(type3:entity31,type2:entity21:entity22%20)%20OR%20" +
"type5:entity51,type6:entity61:entity66");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
for (TimelineEntity entity : entities) {
@@ -1785,7 +1785,7 @@ public void testGetEntitiesRelationFilters() throws Exception {
"type2:entity21:entity22%20AND%20(type3:entity32:entity35%20,%20"+
"type4:entity42))");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (TimelineEntity entity : entities) {
@@ -1793,7 +1793,7 @@ public void testGetEntitiesRelationFilters() throws Exception {
entity.getId().equals("entity2"));
}
} finally {
- client.destroy();
+ client.close();
}
}
@@ -1829,9 +1829,9 @@ public void testGetEntitiesMetricsTimeRange() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 90000) + "&metricstimeend=" + (ts - 80000));
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<TimelineEntity> entities =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 4, 4);
@@ -1841,7 +1841,7 @@ public void testGetEntitiesMetricsTimeRange() throws Exception {
"entities/type1?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 100000) + "&metricstimeend=" + (ts - 80000));
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 5, 9);
@@ -1851,7 +1851,7 @@ public void testGetEntitiesMetricsTimeRange() throws Exception {
"entities/type1?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 100000));
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 5, 9);
@@ -1861,7 +1861,7 @@ public void testGetEntitiesMetricsTimeRange() throws Exception {
"entities/type1?fields=ALL&metricslimit=100&metricstimeend=" +
(ts - 90000));
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 5, 5);
@@ -1871,7 +1871,7 @@ public void testGetEntitiesMetricsTimeRange() throws Exception {
"entities/type1?fields=ALL&metricstimestart=" +
(ts - 100000));
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 5, 5);
@@ -1881,7 +1881,7 @@ public void testGetEntitiesMetricsTimeRange() throws Exception {
"entities/type1/entity2?fields=ALL&metricstimestart=" +
(ts - 100000) + "&metricstimeend=" + (ts - 80000));
resp = getResponse(client, uri);
- TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+ TimelineEntity entity = resp.readEntity(TimelineEntity.class);
assertNotNull(entity);
verifyMetricCount(entity, 3, 3);
@@ -1890,7 +1890,7 @@ public void testGetEntitiesMetricsTimeRange() throws Exception {
"entities/type1/entity2?fields=ALL&metricslimit=5&metricstimestart=" +
(ts - 100000) + "&metricstimeend=" + (ts - 80000));
resp = getResponse(client, uri);
- entity = resp.getEntity(TimelineEntity.class);
+ entity = resp.readEntity(TimelineEntity.class);
assertNotNull(entity);
verifyMetricCount(entity, 3, 5);
@@ -1898,9 +1898,9 @@ public void testGetEntitiesMetricsTimeRange() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 80000) + "&metricstimeend=" + (ts - 90000));
- verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST);
} finally {
- client.destroy();
+ client.close();
}
}
@@ -1914,8 +1914,8 @@ public void testGetEntityDataToRetrieve() throws Exception {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1/entity2?confstoretrieve=cfg_,configuration_");
- ClientResponse resp = getResponse(client, uri);
- TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+ Response resp = getResponse(client, uri);
+ TimelineEntity entity = resp.readEntity(TimelineEntity.class);
assertNotNull(entity);
assertEquals("entity2", entity.getId());
assertEquals("type1", entity.getType());
@@ -1929,7 +1929,7 @@ public void testGetEntityDataToRetrieve() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1/entity2?confstoretrieve=!(cfg_,configuration_)");
resp = getResponse(client, uri);
- entity = resp.getEntity(TimelineEntity.class);
+ entity = resp.readEntity(TimelineEntity.class);
assertNotNull(entity);
assertEquals("entity2", entity.getId());
assertEquals("type1", entity.getType());
@@ -1942,7 +1942,7 @@ public void testGetEntityDataToRetrieve() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1/entity2?metricstoretrieve=MAP1_,HDFS_");
resp = getResponse(client, uri);
- entity = resp.getEntity(TimelineEntity.class);
+ entity = resp.readEntity(TimelineEntity.class);
assertNotNull(entity);
assertEquals("entity2", entity.getId());
assertEquals("type1", entity.getType());
@@ -1956,7 +1956,7 @@ public void testGetEntityDataToRetrieve() throws Exception {
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1/entity2?metricstoretrieve=!(MAP1_,HDFS_)");
resp = getResponse(client, uri);
- entity = resp.getEntity(TimelineEntity.class);
+ entity = resp.readEntity(TimelineEntity.class);
assertNotNull(entity);
assertEquals("entity2", entity.getId());
assertEquals("type1", entity.getType());
@@ -1972,7 +1972,7 @@ public void testGetEntityDataToRetrieve() throws Exception {
"entities/type1/entity2?metricstoretrieve=!(MAP1_,HDFS_)&" +
"metricslimit=5");
resp = getResponse(client, uri);
- entity = resp.getEntity(TimelineEntity.class);
+ entity = resp.readEntity(TimelineEntity.class);
assertNotNull(entity);
assertEquals("entity2", entity.getId());
assertEquals("type1", entity.getType());
@@ -1982,7 +1982,7 @@ public void testGetEntityDataToRetrieve() throws Exception {
assertEquals(TimelineMetric.Type.SINGLE_VALUE, metric.getType());
}
} finally {
- client.destroy();
+ client.close();
}
}
@@ -1993,9 +1993,9 @@ public void testGetFlowRunApps() throws Exception {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
"1002345678919/apps?fields=ALL");
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<TimelineEntity> entities =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (TimelineEntity entity : entities) {
@@ -2014,7 +2014,7 @@ public void testGetFlowRunApps() throws Exception {
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
"1002345678919/apps?fields=ALL&metricslimit=2");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (TimelineEntity entity : entities) {
@@ -2033,7 +2033,7 @@ public void testGetFlowRunApps() throws Exception {
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/users/user1/flows/flow_name/runs/1002345678919/apps");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
@@ -2041,11 +2041,11 @@ public void testGetFlowRunApps() throws Exception {
"timeline/users/user1/flows/flow_name/runs/1002345678919/" +
"apps?limit=1");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
} finally {
- client.destroy();
+ client.close();
}
}
@@ -2056,9 +2056,9 @@ public void testGetFlowApps() throws Exception {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" +
"fields=ALL");
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<TimelineEntity> entities =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(3, entities.size());
for (TimelineEntity entity : entities) {
@@ -2096,7 +2096,7 @@ public void testGetFlowApps() throws Exception {
"timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" +
"fields=ALL&metricslimit=6");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(3, entities.size());
for (TimelineEntity entity : entities) {
@@ -2139,18 +2139,18 @@ public void testGetFlowApps() throws Exception {
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/users/user1/flows/flow_name/apps");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(3, entities.size());
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/users/user1/flows/flow_name/apps?limit=1");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
} finally {
- client.destroy();
+ client.close();
}
}
@@ -2162,9 +2162,9 @@ public void testGetFlowAppsFilters() throws Exception {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" +
"eventfilters=" + ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<TimelineEntity> entities =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
assertTrue("Unexpected app in result", entities.contains(
@@ -2174,7 +2174,7 @@ public void testGetFlowAppsFilters() throws Exception {
"timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" +
"metricfilters=HDFS_BYTES_READ%20ge%200");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
assertTrue("Unexpected app in result", entities.contains(
@@ -2184,13 +2184,13 @@ public void testGetFlowAppsFilters() throws Exception {
"timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" +
"conffilters=cfg1%20eq%20value1");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
assertTrue("Unexpected app in result", entities.contains(
newEntity(entityType, "application_1111111111_2222")));
} finally {
- client.destroy();
+ client.close();
}
}
@@ -2201,9 +2201,9 @@ public void testGetFlowRunNotPresent() throws Exception {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
"1002345678929");
- verifyHttpResponse(client, uri, Status.NOT_FOUND);
+ verifyHttpResponse(client, uri, Response.Status.NOT_FOUND);
} finally {
- client.destroy();
+ client.close();
}
}
@@ -2213,15 +2213,15 @@ public void testGetFlowsNotPresent() throws Exception {
try {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster2/flows");
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<FlowActivityEntity> entities =
- resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8",
- resp.getType().toString());
+ resp.readEntity(new GenericType<Set<FlowActivityEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8",
+ resp.getMediaType().toString());
assertNotNull(entities);
assertEquals(0, entities.size());
} finally {
- client.destroy();
+ client.close();
}
}
@@ -2231,9 +2231,9 @@ public void testGetAppNotPresent() throws Exception {
try {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1378");
- verifyHttpResponse(client, uri, Status.NOT_FOUND);
+ verifyHttpResponse(client, uri, Response.Status.NOT_FOUND);
} finally {
- client.destroy();
+ client.close();
}
}
@@ -2244,15 +2244,15 @@ public void testGetFlowRunAppsNotPresent() throws Exception {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster2/users/user1/flows/flow_name/runs/" +
"1002345678919/apps");
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<TimelineEntity> entities =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8",
- resp.getType().toString());
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8",
+ resp.getMediaType().toString());
assertNotNull(entities);
assertEquals(0, entities.size());
} finally {
- client.destroy();
+ client.close();
}
}
@@ -2262,15 +2262,15 @@ public void testGetFlowAppsNotPresent() throws Exception {
try {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster2/users/user1/flows/flow_name55/apps");
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<TimelineEntity> entities =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8",
- resp.getType().toString());
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8",
+ resp.getMediaType().toString());
assertNotNull(entities);
assertEquals(0, entities.size());
} finally {
- client.destroy();
+ client.close();
}
}
@@ -2287,7 +2287,7 @@ public void testGenericEntitiesForPagination() throws Exception {
+ "/entities/entitytype";
verifyEntitiesForPagination(client, resourceUri);
} finally {
- client.destroy();
+ client.close();
}
}
@@ -2297,9 +2297,9 @@ private void verifyEntitiesForPagination(Client client, String resourceUri)
String queryParam = "?limit=" + limit;
URI uri = URI.create(resourceUri + queryParam);
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
List<TimelineEntity> entities =
- resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ resp.readEntity(new GenericType<List<TimelineEntity>>() {
});
// verify for entity-10 to entity-1 in descending order.
verifyPaginatedEntites(entities, limit, limit);
@@ -2308,7 +2308,7 @@ private void verifyEntitiesForPagination(Client client, String resourceUri)
queryParam = "?limit=" + limit;
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ entities = resp.readEntity(new GenericType<List<TimelineEntity>>() {
});
// verify for entity-10 to entity-7 in descending order.
TimelineEntity entity = verifyPaginatedEntites(entities, limit, 10);
@@ -2317,7 +2317,7 @@ private void verifyEntitiesForPagination(Client client, String resourceUri)
+ entity.getInfo().get(TimelineReaderUtils.FROMID_KEY);
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ entities = resp.readEntity(new GenericType<List<TimelineEntity>>() {
});
// verify for entity-7 to entity-4 in descending order.
entity = verifyPaginatedEntites(entities, limit, 7);
@@ -2326,7 +2326,7 @@ private void verifyEntitiesForPagination(Client client, String resourceUri)
+ entity.getInfo().get(TimelineReaderUtils.FROMID_KEY);
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ entities = resp.readEntity(new GenericType<List<TimelineEntity>>() {
});
// verify for entity-4 to entity-1 in descending order.
entity = verifyPaginatedEntites(entities, limit, 4);
@@ -2335,7 +2335,7 @@ private void verifyEntitiesForPagination(Client client, String resourceUri)
+ entity.getInfo().get(TimelineReaderUtils.FROMID_KEY);
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ entities = resp.readEntity(new GenericType<List<TimelineEntity>>() {
});
// always entity-1 will be retrieved
entity = verifyPaginatedEntites(entities, 1, 1);
@@ -2358,9 +2358,9 @@ private TimelineEntity verifyPaginatedEntites(List<TimelineEntity> entities,
private List<FlowActivityEntity> verifyFlowEntites(Client client, URI uri,
int noOfEntities,
int[] a, String[] flowsInSequence) throws Exception {
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
List<FlowActivityEntity> entities =
- resp.getEntity(new GenericType<List<FlowActivityEntity>>() {
+ resp.readEntity(new GenericType<List<FlowActivityEntity>>() {
});
assertNotNull(entities);
assertEquals(noOfEntities, entities.size());
@@ -2384,9 +2384,9 @@ public void testForFlowAppsPagination() throws Exception {
String resourceUri = "http://localhost:" + getServerPort() + "/ws/v2/"
+ "timeline/clusters/cluster1/users/user1/flows/flow1/apps";
URI uri = URI.create(resourceUri);
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
List<TimelineEntity> entities =
- resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ resp.readEntity(new GenericType<List<TimelineEntity>>() {
});
assertNotNull(entities);
assertEquals(totalAppEntities, entities.size());
@@ -2397,7 +2397,7 @@ public void testForFlowAppsPagination() throws Exception {
String queryParam = "?limit=" + limit;
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ entities = resp.readEntity(new GenericType<List<TimelineEntity>>() {
});
assertNotNull(entities);
assertEquals(limit, entities.size());
@@ -2408,7 +2408,7 @@ public void testForFlowAppsPagination() throws Exception {
URI.create(resourceUri + queryParam + "&fromid="
+ entity10.getInfo().get(TimelineReaderUtils.FROMID_KEY));
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ entities = resp.readEntity(new GenericType<List<TimelineEntity>>() {
});
assertNotNull(entities);
assertEquals(6, entities.size());
@@ -2416,7 +2416,7 @@ public void testForFlowAppsPagination() throws Exception {
assertEquals(entity15, entities.get(5));
} finally {
- client.destroy();
+ client.close();
}
}
@@ -2429,9 +2429,9 @@ public void testForFlowRunAppsPagination() throws Exception {
String resourceUri = "http://localhost:" + getServerPort() + "/ws/v2/"
+ "timeline/clusters/cluster1/users/user1/flows/flow1/runs/1/apps";
URI uri = URI.create(resourceUri);
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
List<TimelineEntity> entities =
- resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ resp.readEntity(new GenericType<List<TimelineEntity>>() {
});
assertNotNull(entities);
assertEquals(totalAppEntities, entities.size());
@@ -2442,7 +2442,7 @@ public void testForFlowRunAppsPagination() throws Exception {
String queryParam = "?limit=" + limit;
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ entities = resp.readEntity(new GenericType<List<TimelineEntity>>() {
});
assertNotNull(entities);
assertEquals(limit, entities.size());
@@ -2453,7 +2453,7 @@ public void testForFlowRunAppsPagination() throws Exception {
URI.create(resourceUri + queryParam + "&fromid="
+ entity3.getInfo().get(TimelineReaderUtils.FROMID_KEY));
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ entities = resp.readEntity(new GenericType<List<TimelineEntity>>() {
});
assertNotNull(entities);
assertEquals(3, entities.size());
@@ -2461,7 +2461,7 @@ public void testForFlowRunAppsPagination() throws Exception {
assertEquals(entity5, entities.get(2));
} finally {
- client.destroy();
+ client.close();
}
}
@@ -2474,9 +2474,9 @@ public void testForFlowRunsPagination() throws Exception {
String resourceUri = "http://localhost:" + getServerPort() + "/ws/v2/"
+ "timeline/clusters/cluster1/users/user1/flows/flow1/runs";
URI uri = URI.create(resourceUri);
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
List<TimelineEntity> entities =
- resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ resp.readEntity(new GenericType<List<TimelineEntity>>() {
});
assertNotNull(entities);
assertEquals(totalRuns, entities.size());
@@ -2487,7 +2487,7 @@ public void testForFlowRunsPagination() throws Exception {
String queryParam = "?limit=" + limit;
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ entities = resp.readEntity(new GenericType<List<TimelineEntity>>() {
});
assertNotNull(entities);
assertEquals(limit, entities.size());
@@ -2497,7 +2497,7 @@ public void testForFlowRunsPagination() throws Exception {
uri = URI.create(resourceUri + queryParam + "&fromid="
+ entity2.getInfo().get(TimelineReaderUtils.FROMID_KEY));
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ entities = resp.readEntity(new GenericType<List<TimelineEntity>>() {
});
assertNotNull(entities);
assertEquals(limit, entities.size());
@@ -2507,13 +2507,13 @@ public void testForFlowRunsPagination() throws Exception {
uri = URI.create(resourceUri + queryParam + "&fromid="
+ entity3.getInfo().get(TimelineReaderUtils.FROMID_KEY));
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ entities = resp.readEntity(new GenericType<List<TimelineEntity>>() {
});
assertNotNull(entities);
assertEquals(1, entities.size());
assertEquals(entity3, entities.get(0));
} finally {
- client.destroy();
+ client.close();
}
}
@@ -2525,9 +2525,9 @@ public void testGetAppsMetricsRange() throws Exception {
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
"1002345678919/apps?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 200000) + "&metricstimeend=" + (ts - 100000));
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<TimelineEntity> entities =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 4, 4);
@@ -2536,7 +2536,7 @@ public void testGetAppsMetricsRange() throws Exception {
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
"1002345678919/apps?fields=ALL&metricslimit=100");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 4, 10);
@@ -2546,7 +2546,7 @@ public void testGetAppsMetricsRange() throws Exception {
"apps?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 200000) + "&metricstimeend=" + (ts - 100000));
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(3, entities.size());
verifyMetricsCount(entities, 5, 5);
@@ -2555,7 +2555,7 @@ public void testGetAppsMetricsRange() throws Exception {
"timeline/clusters/cluster1/users/user1/flows/flow_name/" +
"apps?fields=ALL&metricslimit=100");
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(3, entities.size());
verifyMetricsCount(entities, 5, 12);
@@ -2565,7 +2565,7 @@ public void testGetAppsMetricsRange() throws Exception {
"1002345678919/apps?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 200000));
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 4, 10);
@@ -2575,7 +2575,7 @@ public void testGetAppsMetricsRange() throws Exception {
"1002345678919/apps?fields=ALL&metricslimit=100&metricstimeend=" +
(ts - 100000));
resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 4, 4);
@@ -2586,7 +2586,7 @@ public void testGetAppsMetricsRange() throws Exception {
"&metricstimestart=" +(ts - 200000) + "&metricstimeend=" +
(ts - 100000));
resp = getResponse(client, uri);
- TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+ TimelineEntity entity = resp.readEntity(TimelineEntity.class);
assertNotNull(entity);
verifyMetricCount(entity, 3, 3);
@@ -2594,9 +2594,9 @@ public void testGetAppsMetricsRange() throws Exception {
"timeline/clusters/cluster1/users/user1/flows/flow_name/" +
"apps?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 100000) + "&metricstimeend=" + (ts - 200000));
- verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST);
} finally {
- client.destroy();
+ client.close();
}
}
@@ -2607,12 +2607,12 @@ public void testGetEntityWithSystemEntityType() throws Exception {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/apps/application_1111111111_1111/" +
"entities/YARN_APPLICATION");
- ClientResponse resp = getResponse(client, uri);
+ Response resp = getResponse(client, uri);
Set<TimelineEntity> entities =
- resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ resp.readEntity(new GenericType<Set<TimelineEntity>>(){});
assertEquals(0, entities.size());
} finally {
- client.destroy();
+ client.close();
}
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityListReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityListReader.java
new file mode 100644
index 0000000..5304cc7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityListReader.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.Provider;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.util.*;
+
+/**
+ * We have defined a dedicated Reader for `Set<TimelineEntity>`,
+ * aimed at adapting to the Jersey2 framework
+ * to ensure that JSON can be converted into `Set<TimelineEntity>`.
+ */
+@Provider
+@Consumes(MediaType.APPLICATION_JSON)
+public class TimelineEntityListReader implements MessageBodyReader<List<TimelineEntity>> {
+
+ private ObjectMapper objectMapper = new ObjectMapper();
+ private String timelineEntityType =
+ "java.util.List<org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity>";
+
+ @Override
+ public boolean isReadable(Class<?> type, Type genericType,
+ Annotation[] annotations, MediaType mediaType) {
+ return timelineEntityType.equals(genericType.getTypeName());
+ }
+
+ @Override
+ public List<TimelineEntity> readFrom(Class<List<TimelineEntity>> type,
+ Type genericType, Annotation[] annotations, MediaType mediaType,
+ MultivaluedMap<String, String> httpHeaders,
+ InputStream entityStream) throws IOException, WebApplicationException {
+ List<TimelineEntity> timelineEntityList = new ArrayList<>();
+
+ JsonNode jsonNode = objectMapper.readTree(entityStream);
+ if (jsonNode.isArray()) {
+ for (JsonNode jNode : jsonNode) {
+ TimelineEntity entity = new TimelineEntity();
+
+ // Get Identifier
+ JsonNode jnIdentifier = jNode.get("identifier");
+ JsonNode jnType = jnIdentifier.get("type");
+ JsonNode jnId = jnIdentifier.get("id");
+ Identifier identifier = new Identifier(jnType.asText(), jnId.asText());
+ entity.setIdentifier(identifier);
+
+ // Get Type
+ JsonNode jnAppType = jNode.get("type");
+ entity.setType(jnAppType.asText());
+
+ // Get Createdtime
+ JsonNode jnCreatedTime = jNode.get("createdtime");
+ entity.setCreatedTime(jnCreatedTime.asLong());
+
+ JsonNode jnMetrics = jNode.get("metrics");
+ Set<TimelineMetric> metricSet = new HashSet<>();
+
+ if (jnMetrics.isArray()) {
+ for (JsonNode metric : jnMetrics) {
+ TimelineMetric timelineMetric = objectMapper.treeToValue(metric, TimelineMetric.class);
+ metricSet.add(timelineMetric);
+ System.out.println(metric);
+ }
+ }
+ entity.setMetrics(metricSet);
+
+ // Get configs
+ JsonNode jnConfigs = jNode.get("configs");
+ if (jnConfigs != null) {
+ Map<String, String> configInfos =
+ objectMapper.treeToValue(jnConfigs, Map.class);
+ entity.setConfigs(configInfos);
+ }
+
+ // Get info
+ JsonNode jnInfos = jNode.get("info");
+ if (jnInfos != null) {
+ Map<String, Object> entityInfos =
+ objectMapper.treeToValue(jnInfos, Map.class);
+ entity.setInfo(entityInfos);
+ }
+
+ // Get idprefix
+ JsonNode jnIdprefix = jNode.get("idprefix");
+ entity.setIdPrefix(jnIdprefix.asLong());
+
+ timelineEntityList.add(entity);
+ }
+ }
+
+ return timelineEntityList;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntitySetReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntitySetReader.java
new file mode 100644
index 0000000..0e56a10
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntitySetReader.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.Provider;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * We have defined a dedicated Reader for `Set<TimelineEntity>`,
+ * aimed at adapting to the Jersey2 framework
+ * to ensure that JSON can be converted into `Set<TimelineEntity>`.
+ */
+@Provider
+@Consumes(MediaType.APPLICATION_JSON)
+public class TimelineEntitySetReader implements MessageBodyReader<Set<TimelineEntity>> {
+
+ private ObjectMapper objectMapper = new ObjectMapper();
+ private String timelineEntityType =
+ "java.util.Set<org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity>";
+
+ @Override
+ public boolean isReadable(Class<?> type, Type genericType,
+ Annotation[] annotations, MediaType mediaType) {
+ return timelineEntityType.equals(genericType.getTypeName());
+ }
+
+ @Override
+ public Set<TimelineEntity> readFrom(Class<Set<TimelineEntity>> type,
+ Type genericType, Annotation[] annotations, MediaType mediaType,
+ MultivaluedMap<String, String> httpHeaders,
+ InputStream entityStream) throws IOException, WebApplicationException {
+ Set<TimelineEntity> timelineEntitySet = new HashSet<>();
+
+ JsonNode jsonNode = objectMapper.readTree(entityStream);
+ if (jsonNode.isArray()) {
+ for (JsonNode jNode : jsonNode) {
+ TimelineEntity entity = new TimelineEntity();
+
+ // Get Identifier
+ JsonNode jnIdentifier = jNode.get("identifier");
+ JsonNode jnType = jnIdentifier.get("type");
+ JsonNode jnId = jnIdentifier.get("id");
+ Identifier identifier = new Identifier(jnType.asText(), jnId.asText());
+ entity.setIdentifier(identifier);
+
+ // Get Type
+ JsonNode jnAppType = jNode.get("type");
+ entity.setType(jnAppType.asText());
+
+ // Get Createdtime
+ JsonNode jnCreatedTime = jNode.get("createdtime");
+ entity.setCreatedTime(jnCreatedTime.asLong());
+
+ // Get metrics
+ JsonNode jnMetrics = jNode.get("metrics");
+ Set<TimelineMetric> metricSet = new HashSet<>();
+
+ if (jnMetrics.isArray()) {
+ for (JsonNode metric : jnMetrics) {
+ TimelineMetric timelineMetric = objectMapper.treeToValue(metric, TimelineMetric.class);
+ metricSet.add(timelineMetric);
+ }
+ }
+ entity.setMetrics(metricSet);
+
+ // Get configs
+ JsonNode jnConfigs = jNode.get("configs");
+ if (jnConfigs != null) {
+ Map<String, String> configInfos =
+ objectMapper.treeToValue(jnConfigs, Map.class);
+ entity.setConfigs(configInfos);
+ }
+
+ // Get info
+ JsonNode jnInfos = jNode.get("info");
+ if (jnInfos != null) {
+ Map<String, Object> entityInfos =
+ objectMapper.treeToValue(jnInfos, Map.class);
+ entity.setInfo(entityInfos);
+ }
+
+
+ // Get idprefix
+ JsonNode jnIdprefix = jNode.get("idprefix");
+ entity.setIdPrefix(jnIdprefix.asLong());
+
+ timelineEntitySet.add(entity);
+ }
+ }
+
+ return timelineEntitySet;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index c3ee758..f34a6dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -251,7 +251,7 @@ public void testWriteFlowRunMinMax() throws Exception {
new TimelineDataToRetrieve());
assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
FlowRunEntity flowRun = (FlowRunEntity)entity;
- assertEquals(minStartTs, flowRun.getStartTime());
+ assertEquals(minStartTs, flowRun.getStartTime().longValue());
assertEquals(endTs, flowRun.getMaxEndTime());
} finally {
if (hbr != null) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 7233dab..9300825 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -646,7 +646,9 @@ public void flush() throws IOException {
protected void serviceStop() throws Exception {
boolean isStorageUp = true;
try {
- storageMonitor.checkStorageIsUp();
+ if (storageMonitor != null) {
+ storageMonitor.checkStorageIsUp();
+ }
} catch (IOException e) {
LOG.warn("Failed to close the timeline tables as Hbase is down", e);
isStorageUp = false;
@@ -688,7 +690,9 @@ protected void serviceStop() throws Exception {
conn.close();
}
}
- storageMonitor.stop();
+ if (storageMonitor != null) {
+ storageMonitor.stop();
+ }
super.serviceStop();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
index f2e7c5c..c0f61f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
@@ -29,7 +29,6 @@
<properties>
<hadoop.common.build.dir>${basedir}/../../../../hadoop-common-project/hadoop-common/target</hadoop.common.build.dir>
- <hbase-compatible-hadoop.version>3.3.6</hbase-compatible-hadoop.version>
</properties>
<!-- Do not add dependencies here, add them to the POM of the leaf module -->