| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.samza.webapp; |
| |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import java.io.IOException; |
| import java.io.StringReader; |
| import java.net.MalformedURLException; |
| import java.net.URL; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Set; |
| import org.apache.commons.io.input.ReaderInputStream; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.util.ConverterUtils; |
| import org.apache.http.HttpEntity; |
| import org.apache.http.HttpHost; |
| import org.apache.http.HttpStatus; |
| import org.apache.http.StatusLine; |
| import org.apache.http.client.methods.CloseableHttpResponse; |
| import org.apache.http.client.methods.HttpGet; |
| import org.apache.http.impl.client.CloseableHttpClient; |
| import org.apache.samza.Partition; |
| import org.apache.samza.SamzaException; |
| import org.apache.samza.clustermanager.SamzaApplicationState; |
| import org.apache.samza.clustermanager.SamzaResource; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.config.MapConfig; |
| import org.apache.samza.container.TaskName; |
| import org.apache.samza.container.grouper.task.GroupByContainerCount; |
| import org.apache.samza.coordinator.JobModelManager; |
| import org.apache.samza.job.model.ContainerModel; |
| import org.apache.samza.job.model.JobModel; |
| import org.apache.samza.job.model.TaskModel; |
| import org.apache.samza.job.yarn.SamzaAppMasterMetrics; |
| import org.apache.samza.job.yarn.YarnAppState; |
| import org.apache.samza.job.yarn.YarnContainer; |
| import org.apache.samza.metrics.MetricsRegistryMap; |
| import org.apache.samza.serializers.model.SamzaObjectMapper; |
| import org.apache.samza.system.SystemStream; |
| import org.apache.samza.system.SystemStreamPartition; |
| import org.codehaus.jackson.map.ObjectMapper; |
| import org.codehaus.jackson.type.TypeReference; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| |
| import static org.junit.Assert.*; |
| import static org.mockito.Mockito.*; |
| |
| |
| public class TestApplicationMasterRestClient { |
| private static final String AM_HOST_NAME = "dummyHost"; |
| private static final int AM_RPC_PORT = 1337; |
| private static final int AM_HTTP_PORT = 7001; |
| private static final String YARN_CONTAINER_ID_1 = "container_e38_1510966221296_0007_01_000001"; |
| private static final String YARN_CONTAINER_ID_2 = "container_e38_1510966221296_0007_01_000002"; |
| private static final String YARN_CONTAINER_ID_3 = "container_e38_1510966221296_0007_01_000003"; |
| private static final String APP_ATTEMPT_ID = "appattempt_1510966221296_0007_000001"; |
| |
| private final ObjectMapper jsonMapper = SamzaObjectMapper.getObjectMapper(); |
| |
| private CloseableHttpClient mockClient; |
| |
| @Before |
| public void setup() { |
| mockClient = mock(CloseableHttpClient.class); |
| } |
| |
| @Rule |
| public ExpectedException expectedException = ExpectedException.none(); // Enables us to verify the exception message |
| |
| @Test |
| public void testGetMetricsSuccess() throws IOException { |
| SamzaApplicationState samzaAppState = createSamzaApplicationState(); |
| |
| MetricsRegistryMap registry = new MetricsRegistryMap(); |
| assignMetricValues(samzaAppState, registry); |
| |
| String response = ApplicationMasterRestServlet.getMetrics(jsonMapper, registry); |
| setupMockClientResponse(HttpStatus.SC_OK, "Success", response); |
| |
| ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); |
| Map<String, Map<String, Object>> metricsResult = client.getMetrics(); |
| |
| String group = SamzaAppMasterMetrics.class.getCanonicalName(); |
| assertEquals(1, metricsResult.size()); |
| assertTrue(metricsResult.containsKey(group)); |
| |
| Map<String, Object> amMetricsGroup = metricsResult.get(group); |
| assertEquals(8, amMetricsGroup.size()); |
| assertEquals(samzaAppState.runningProcessors.size(), amMetricsGroup.get("running-containers")); |
| assertEquals(samzaAppState.neededProcessors.get(), amMetricsGroup.get("needed-containers")); |
| assertEquals(samzaAppState.completedProcessors.get(), amMetricsGroup.get("completed-containers")); |
| assertEquals(samzaAppState.failedContainers.get(), amMetricsGroup.get("failed-containers")); |
| assertEquals(samzaAppState.releasedContainers.get(), amMetricsGroup.get("released-containers")); |
| assertEquals(samzaAppState.processorCount.get(), amMetricsGroup.get("container-count")); |
| assertEquals(samzaAppState.jobHealthy.get() ? 1 : 0, amMetricsGroup.get("job-healthy")); |
| assertEquals(0, amMetricsGroup.get("container-from-previous-attempt")); |
| } |
| |
| @Test |
| public void testGetMetricsError() throws IOException { |
| setupErrorTest("metrics"); |
| |
| ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); |
| client.getMetrics(); |
| } |
| |
| @Test |
| public void testGetTaskContextSuccess() throws IOException { |
| ContainerId containerId = ConverterUtils.toContainerId(YARN_CONTAINER_ID_1); |
| YarnAppState yarnAppState = createYarnAppState(containerId); |
| |
| String response = ApplicationMasterRestServlet.getTaskContext(jsonMapper, yarnAppState); |
| setupMockClientResponse(HttpStatus.SC_OK, "Success", response); |
| |
| ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); |
| Map<String, Object> taskContextResult = client.getTaskContext(); |
| |
| assertEquals(2, taskContextResult.size()); |
| assertEquals(2, taskContextResult.get("task-id")); |
| assertEquals(containerId.toString(), taskContextResult.get("name")); |
| } |
| |
| @Test |
| public void testTaskContextError() throws IOException { |
| setupErrorTest("task context"); |
| |
| ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); |
| client.getTaskContext(); |
| } |
| |
| @Test |
| public void testGetAmStateSuccess() throws IOException { |
| SamzaApplicationState samzaAppState = createSamzaApplicationState(); |
| |
| ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId(APP_ATTEMPT_ID); |
| ContainerId containerId = ConverterUtils.toContainerId(YARN_CONTAINER_ID_1); |
| YarnAppState yarnAppState = createYarnAppState(containerId); |
| |
| String response = ApplicationMasterRestServlet.getAmState(jsonMapper, samzaAppState, yarnAppState); |
| setupMockClientResponse(HttpStatus.SC_OK, "Success", response); |
| |
| ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); |
| Map<String, Object> amStateResult = client.getAmState(); |
| |
| assertEquals(4, amStateResult.size()); |
| assertEquals(String.format("%s:%s", yarnAppState.nodeHost, yarnAppState.rpcUrl.getPort()), amStateResult.get("host")); |
| assertEquals(containerId.toString(), amStateResult.get("container-id")); |
| // Can only validate the keys because up-time changes everytime it's requested |
| assertEquals(buildExpectedContainerResponse(yarnAppState.runningProcessors, samzaAppState).keySet(), |
| ((Map<String, Object>) amStateResult.get("containers")).keySet()); |
| assertEquals(attemptId.toString(), amStateResult.get("app-attempt-id")); |
| } |
| |
| @Test |
| public void testGetAmStateError() throws IOException { |
| setupErrorTest("AM state"); |
| |
| ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); |
| client.getAmState(); |
| } |
| |
| @Test |
| public void testGetConfigSuccess() throws IOException { |
| SamzaApplicationState samzaAppState = createSamzaApplicationState(); |
| |
| Map<String, String> configMap = ImmutableMap.of("key1", "value1", "key2", "value2"); |
| Config config = new MapConfig(configMap); |
| |
| String response = ApplicationMasterRestServlet.getConfig(jsonMapper, config); |
| setupMockClientResponse(HttpStatus.SC_OK, "Success", response); |
| |
| ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); |
| Map<String, Object> configResult = client.getConfig(); |
| |
| assertEquals(configMap, configResult); |
| } |
| |
| @Test |
| public void testGetConfigError() throws IOException { |
| setupErrorTest("config"); |
| |
| ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); |
| client.getConfig(); |
| } |
| |
| @Test |
| public void testCloseMethodClosesHttpClient() throws IOException { |
| ApplicationMasterRestClient client = new ApplicationMasterRestClient(mockClient, AM_HOST_NAME, AM_RPC_PORT); |
| client.close(); |
| |
| verify(mockClient).close(); |
| } |
| |
| private void setupMockClientResponse(int statusCode, String statusReason, String responseBody) throws IOException { |
| StatusLine statusLine = mock(StatusLine.class); |
| when(statusLine.getStatusCode()).thenReturn(statusCode); |
| when(statusLine.getReasonPhrase()).thenReturn(statusReason); |
| |
| HttpEntity entity = mock(HttpEntity.class); |
| when(entity.getContent()).thenReturn(new ReaderInputStream(new StringReader(responseBody))); |
| |
| CloseableHttpResponse response = mock(CloseableHttpResponse.class); |
| when(response.getStatusLine()).thenReturn(statusLine); |
| when(response.getEntity()).thenReturn(entity); |
| |
| when(mockClient.execute(any(HttpHost.class), any(HttpGet.class))).thenReturn(response); |
| } |
| |
| private SamzaApplicationState createSamzaApplicationState() { |
| HashMap<String, ContainerModel> containers = generateContainers(); |
| |
| JobModel mockJobModel = mock(JobModel.class); |
| when(mockJobModel.getContainers()).thenReturn(containers); |
| JobModelManager mockJobModelManager = mock(JobModelManager.class); |
| when(mockJobModelManager.jobModel()).thenReturn(mockJobModel); |
| |
| SamzaApplicationState samzaApplicationState = new SamzaApplicationState(mockJobModelManager); |
| |
| samzaApplicationState.runningProcessors.put(YARN_CONTAINER_ID_3, |
| new SamzaResource(1, 2, "dummyNodeHost1", "dummyResourceId1")); |
| samzaApplicationState.runningProcessors.put(YARN_CONTAINER_ID_2, |
| new SamzaResource(2, 4, "dummyNodeHost2", "dummyResourceId2")); |
| return samzaApplicationState; |
| } |
| |
| private YarnAppState createYarnAppState(ContainerId containerId) throws MalformedURLException { |
| YarnAppState yarnAppState = new YarnAppState(2, containerId, AM_HOST_NAME, AM_RPC_PORT, AM_HTTP_PORT); |
| yarnAppState.rpcUrl = new URL(new HttpHost(AM_HOST_NAME, AM_RPC_PORT).toURI()); |
| yarnAppState.runningProcessors.put("0", new YarnContainer(Container.newInstance( |
| ConverterUtils.toContainerId(YARN_CONTAINER_ID_2), |
| ConverterUtils.toNodeIdWithDefaultPort("dummyNodeHost1"), |
| "dummyNodeHttpHost1", |
| null, |
| null, |
| null |
| ))); |
| yarnAppState.runningProcessors.put("1", new YarnContainer(Container.newInstance( |
| ConverterUtils.toContainerId(YARN_CONTAINER_ID_3), |
| ConverterUtils.toNodeIdWithDefaultPort("dummyNodeHost2"), |
| "dummyNodeHttpHost2", |
| null, |
| null, |
| null |
| ))); |
| return yarnAppState; |
| } |
| |
| private HashMap<String, ContainerModel> generateContainers() { |
| Set<TaskModel> taskModels = ImmutableSet.of( |
| new TaskModel(new TaskName("task1"), |
| ImmutableSet.of(new SystemStreamPartition(new SystemStream("system1", "stream1"), new Partition(0))), |
| new Partition(0)), |
| new TaskModel(new TaskName("task2"), |
| ImmutableSet.of(new SystemStreamPartition(new SystemStream("system1", "stream1"), new Partition(1))), |
| new Partition(1))); |
| GroupByContainerCount grouper = new GroupByContainerCount(2); |
| Set<ContainerModel> containerModels = grouper.group(taskModels); |
| HashMap<String, ContainerModel> containers = new HashMap<>(); |
| for (ContainerModel containerModel : containerModels) { |
| containers.put(containerModel.getId(), containerModel); |
| } |
| return containers; |
| } |
| |
| private Map<String, Map<String, Object>> buildExpectedContainerResponse(Map<String, YarnContainer> runningYarnContainers, |
| SamzaApplicationState samzaAppState) throws IOException { |
| Map<String, Map<String, Object>> containers = new HashMap<>(); |
| |
| runningYarnContainers.forEach((containerId, container) -> { |
| String yarnContainerId = container.id().toString(); |
| Map<String, Object> containerMap = new HashMap(); |
| Map<TaskName, TaskModel> taskModels = samzaAppState.jobModelManager.jobModel().getContainers().get(containerId).getTasks(); |
| containerMap.put("yarn-address", container.nodeHttpAddress()); |
| containerMap.put("start-time", String.valueOf(container.startTime())); |
| containerMap.put("up-time", String.valueOf(container.upTime())); |
| containerMap.put("task-models", taskModels); |
| containerMap.put("container-id", containerId); |
| containers.put(yarnContainerId, containerMap); |
| }); |
| |
| return jsonMapper.readValue(jsonMapper.writeValueAsString(containers), new TypeReference<Map<String, Map<String, Object>>>() { }); |
| } |
| |
| private void assignMetricValues(SamzaApplicationState samzaAppState, MetricsRegistryMap registry) { |
| SamzaAppMasterMetrics metrics = new SamzaAppMasterMetrics(new MapConfig(), samzaAppState, registry); |
| metrics.start(); |
| samzaAppState.runningProcessors.put("dummyContainer", |
| new SamzaResource(1, 2, AM_HOST_NAME, "dummyResourceId")); // 1 container |
| samzaAppState.neededProcessors.set(2); |
| samzaAppState.completedProcessors.set(3); |
| samzaAppState.failedContainers.set(4); |
| samzaAppState.releasedContainers.set(5); |
| samzaAppState.processorCount.set(6); |
| samzaAppState.jobHealthy.set(true); |
| } |
| |
| private void setupErrorTest(String entityToFetch) throws IOException { |
| String statusReason = "Dummy status reason"; |
| expectedException.expect(SamzaException.class); |
| expectedException.expectMessage(String.format( |
| "Error retrieving %s from host %s. Response: %s", |
| entityToFetch, |
| new HttpHost(AM_HOST_NAME, AM_RPC_PORT).toURI(), |
| statusReason)); |
| |
| setupMockClientResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR, statusReason, ""); |
| } |
| } |