blob: 7046f8beab1da027f0b5f25124d8023338515c3d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.jpm.mr.running.parser;
import com.sun.jersey.api.client.Client;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.TestingServer;
import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
import org.apache.eagle.jpm.mr.running.parser.metrics.JobExecutionMetricsCreationListener;
import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
import org.apache.eagle.jpm.mr.runningentity.JobConfig;
import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
import org.apache.eagle.jpm.util.resourcefetch.connection.URLConnectionUtils;
import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
import org.apache.eagle.jpm.util.resourcefetch.model.AppsWrapper;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.*;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.net.URLConnection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
@RunWith(PowerMockRunner.class)
@PrepareForTest({InputStreamUtils.class, MRJobParser.class, URLConnectionUtils.class, Math.class, MRJobEntityCreationHandler.class})
@PowerMockIgnore({"javax.*", "org.w3c.*", "com.sun.org.apache.xerces.*","org.apache.xerces.*"})
public class MRJobParserTest {
private static final String ZK_JOB_PATH = "/apps/mr/running/sandbox/jobs/application_1479206441898_30784/job_1479206441898_30784";
private static final String ZK_APP_PATH = "/apps/mr/running/sandbox/jobs/application_1479206441898_30784";
private static final String JOB_CONF_URL = "http://host.domain.com:8088/proxy/application_1479206441898_30784/ws/v1/mapreduce/jobs/job_1479206441898_30784/conf?anonymous=true";
private static final String JOB_COUNT_URL = "http://host.domain.com:8088/proxy/application_1479206441898_30784/ws/v1/mapreduce/jobs/job_1479206441898_30784/counters?anonymous=true";
private static final String JOB_ID = "job_1479206441898_30784";
private static final String JOB_URL = "http://host.domain.com:8088/proxy/application_1479206441898_30784/ws/v1/mapreduce/jobs?anonymous=true";
private static final String DATA_FROM_ZK = "{\"entityTags\":\"{\\\"jobName\\\":\\\"oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W\\\",\\\"jobId\\\":\\\"job_1479206441898_30784\\\",\\\"site\\\":\\\"sandbox\\\",\\\"jobDefId\\\":\\\"eagletest\\\",\\\"jobType\\\":\\\"HIVE\\\",\\\"user\\\":\\\"xxx\\\",\\\"queue\\\":\\\"xxx\\\"}\",\"appInfo\":\"{\\\"applicationType\\\":\\\"MAPREDUCE\\\",\\\"startedTime\\\":\\\"1479328221694\\\",\\\"finalStatus\\\":\\\"UNDEFINED\\\",\\\"trackingUrl\\\":\\\"http:\\\\\\/\\\\\\/host.domain.com:8088\\\\\\/proxy\\\\\\/application_1479206441898_30784\\\\\\/\\\",\\\"runningContainers\\\":\\\"2\\\",\\\"trackingUI\\\":\\\"ApplicationMaster\\\",\\\"clusterId\\\":\\\"1479206441898\\\",\\\"amContainerLogs\\\":\\\"http:\\\\\\/\\\\\\/host.domain.com:8088\\\\\\/node\\\\\\/containerlogs\\\\\\/container_e11_1479206441898_30784_01_000001\\\\\\/xxx\\\",\\\"allocatedVCores\\\":\\\"2\\\",\\\"diagnostics\\\":\\\"\\\",\\\"name\\\":\\\"oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W\\\",\\\"progress\\\":\\\"95.0\\\",\\\"finishedTime\\\":\\\"0\\\",\\\"allocatedMB\\\":\\\"3072\\\",\\\"id\\\":\\\"application_1479206441898_30784\\\",\\\"state\\\":\\\"RUNNING\\\",\\\"amHostHttpAddress\\\":\\\"host.domain.com:8088\\\",\\\"user\\\":\\\"xxx\\\",\\\"queue\\\":\\\"xxx\\\",\\\"elapsedTime\\\":\\\"13367402\\\"}\"}";
private static TestingServer zk;
private static String ZKROOT;
private static MRRunningJobConfig mrRunningJobConfig;
private static Config config = ConfigFactory.load();
private static CuratorFramework curator;
private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
private EagleServiceClientImpl client;
@BeforeClass
public static void startZookeeper() throws Exception {
zk = new TestingServer();
curator = CuratorFrameworkFactory.newClient(zk.getConnectString(), new RetryOneTime(1));
mrRunningJobConfig = MRRunningJobConfig.newInstance(config);
mrRunningJobConfig.getZkStateConfig().zkQuorum = zk.getConnectString();
ZKROOT = mrRunningJobConfig.getZkStateConfig().zkRoot;
OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
curator.start();
}
@AfterClass
public static void teardownZookeeper() throws IOException {
curator.close();
zk.stop();
}
@Before
public void cleanZkPath() throws Exception {
if (curator.checkExists().forPath(ZK_JOB_PATH) != null) {
curator.delete().deletingChildrenIfNeeded().forPath(ZK_JOB_PATH);
}
if (curator.checkExists().forPath(ZK_APP_PATH) != null) {
curator.delete().deletingChildrenIfNeeded().forPath(ZK_APP_PATH);
}
if (curator.checkExists().forPath(ZKROOT) != null) {
curator.delete().deletingChildrenIfNeeded().forPath(ZKROOT);
}
}
@Test
public void testMRJobParser() throws Exception {
//TODO fetch task attempt when(Math.random()).thenReturn(0.0); http://host.domain.com:8088/proxy/application_1479206441898_30784/ws/v1/mapreduce/jobs/job_1479206441898_30784/tasks?anonymous=true
setupMock();
mockInputJobSteam("/mrjob_30784.json", JOB_URL);
mockInputJobSteam("/jobcounts_30784.json", JOB_COUNT_URL);
mockGetConnection("/mrconf_30784.xml");
Assert.assertTrue(curator.checkExists().forPath(ZKROOT) == null);
Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
List<String> confKeyKeys = makeConfKeyKeys(mrRunningJobConfig);
InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json");
AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class);
List<AppInfo> appInfos = appsWrapper.getApps().getApp();
AppInfo app1 = appInfos.get(0);
Map<String, JobExecutionAPIEntity> mrJobs = null;
MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
Map<String, JobConfig> jobIdToJobConfig = getMrJobConfigs(mrJobParser);
MRJobEntityCreationHandler mrJobEntityCreationHandler = getMrJobEntityCreationHandler(mrJobParser);
List<TaggedLogAPIEntity> entities = getMrJobEntityCreationHandlerEntities(mrJobEntityCreationHandler);
Assert.assertTrue(jobIdToJobExecutionAPIEntity.isEmpty());
Assert.assertTrue(jobIdToJobConfig.isEmpty());
Assert.assertTrue(entities.isEmpty());
Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
mrJobParser.run();
Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 1);
JobExecutionAPIEntity jobExecutionAPIEntity = jobIdToJobExecutionAPIEntity.get(JOB_ID);
Assert.assertEquals("AppInfo{id='application_1479206441898_30784', user='xxx', name='oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W', queue='xxx', state='RUNNING', finalStatus='UNDEFINED', progress=95.0, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_30784/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479328221694, finishedTime=0, elapsedTime=13367402, amContainerLogs='http://host.domain.com:8088/node/containerlogs/container_e11_1479206441898_30784_01_000001/xxx', amHostHttpAddress='host.domain.com:8088', allocatedMB=3072, allocatedVCores=2, runningContainers=2}", jobExecutionAPIEntity.getAppInfo().toString());
Assert.assertEquals("RUNNING", jobExecutionAPIEntity.getCurrentState());
Assert.assertEquals("RUNNING", jobExecutionAPIEntity.getInternalState());
Assert.assertEquals("prefix:null, timestamp:1479328221694, humanReadableDate:2016-11-16 20:30:21,694, tags: jobName=oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W,jobId=job_1479206441898_30784,site=sandbox,jobDefId=eagletest,jobType=HIVE,user=xxx,queue=xxx,, encodedRowkey:null", jobExecutionAPIEntity.toString());
//Assert.assertEquals("prefix:null, timestamp:1479328221694, humanReadableDate:2016-11-16 20:30:21,694, tags: jobName=oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W,jobId=job_1479206441898_30784,site=sandbox,jobDefId=oozie:launcher-shell-wf_co_xxx_xxx_v3-extract_org_data~,jobType=HIVE,user=xxx,queue=xxx,, encodedRowkey:null", jobExecutionAPIEntity.toString());
Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
Assert.assertEquals("{eagle.job.name=eagletest, hive.optimize.skewjoin.compiletime=false, hive.query.string=insert overwrite table xxxx}", jobExecutionAPIEntity.getJobConfig().toString());
Assert.assertTrue(jobIdToJobConfig.size() == 1);
Assert.assertEquals("{eagle.job.name=eagletest, hive.optimize.skewjoin.compiletime=false, hive.query.string=insert overwrite table xxxx}", jobIdToJobConfig.get(JOB_ID).toString());
Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) != null);
Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) != null);
Assert.assertTrue(curator.checkExists().forPath(ZKROOT) != null);
Assert.assertEquals(DATA_FROM_ZK, new String(curator.getData().forPath(ZK_JOB_PATH), "UTF-8"));
Assert.assertTrue(entities.isEmpty());
verify(client, times(1)).create(any());
}
@Test
public void testMRJobParserFetchMrJobFail() throws Exception {
setupMock();
mockInputJobSteamWithException(JOB_URL);
mockGetConnection("/mrconf_30784.xml");
Assert.assertTrue(curator.checkExists().forPath(ZKROOT) == null);
Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
List<String> confKeyKeys = makeConfKeyKeys(mrRunningJobConfig);
InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json");
AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class);
List<AppInfo> appInfos = appsWrapper.getApps().getApp();
AppInfo app1 = appInfos.get(0);
Map<String, JobExecutionAPIEntity> mrJobs = null;
MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
Map<String, JobConfig> jobIdToJobConfig = getMrJobConfigs(mrJobParser);
MRJobEntityCreationHandler mrJobEntityCreationHandler = getMrJobEntityCreationHandler(mrJobParser);
List<TaggedLogAPIEntity> entities = getMrJobEntityCreationHandlerEntities(mrJobEntityCreationHandler);
Assert.assertTrue(jobIdToJobExecutionAPIEntity.isEmpty());
Assert.assertTrue(jobIdToJobConfig.isEmpty());
Assert.assertTrue(entities.isEmpty());
Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
mrJobParser.run();
Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 0);
Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
Assert.assertTrue(entities.isEmpty());
verify(client, never()).create(any());
}
@Test
public void testMRJobParserFetchJobConfFail() throws Exception {
setupMock();
mockInputJobSteam("/mrjob_30784.json", JOB_URL);
mockGetConnectionWithException("/mrconf_30784.xml");
Assert.assertTrue(curator.checkExists().forPath(ZKROOT) == null);
Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
List<String> confKeyKeys = makeConfKeyKeys(mrRunningJobConfig);
InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json");
AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class);
List<AppInfo> appInfos = appsWrapper.getApps().getApp();
AppInfo app1 = appInfos.get(0);
Map<String, JobExecutionAPIEntity> mrJobs = null;
MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
Map<String, JobConfig> jobIdToJobConfig = getMrJobConfigs(mrJobParser);
MRJobEntityCreationHandler mrJobEntityCreationHandler = getMrJobEntityCreationHandler(mrJobParser);
List<TaggedLogAPIEntity> entities = getMrJobEntityCreationHandlerEntities(mrJobEntityCreationHandler);
Assert.assertTrue(jobIdToJobExecutionAPIEntity.isEmpty());
Assert.assertTrue(jobIdToJobConfig.isEmpty());
Assert.assertTrue(entities.isEmpty());
Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
mrJobParser.run();
Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 1);
Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
Assert.assertTrue(entities.isEmpty());
verify(client, times(1)).create(any());
}
@Test
public void testMRJobParserFetchJobCountFail() throws Exception {
setupMock();
mockInputJobSteam("/mrjob_30784.json", JOB_URL);
mockGetConnection("/mrconf_30784.xml");
mockInputJobSteamWithException(JOB_COUNT_URL);
Assert.assertTrue(curator.checkExists().forPath(ZKROOT) == null);
Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
List<String> confKeyKeys = makeConfKeyKeys(mrRunningJobConfig);
InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json");
AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class);
List<AppInfo> appInfos = appsWrapper.getApps().getApp();
AppInfo app1 = appInfos.get(0);
Map<String, JobExecutionAPIEntity> mrJobs = null;
MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
Map<String, JobConfig> jobIdToJobConfig = getMrJobConfigs(mrJobParser);
MRJobEntityCreationHandler mrJobEntityCreationHandler = getMrJobEntityCreationHandler(mrJobParser);
List<TaggedLogAPIEntity> entities = getMrJobEntityCreationHandlerEntities(mrJobEntityCreationHandler);
Assert.assertTrue(jobIdToJobExecutionAPIEntity.isEmpty());
Assert.assertTrue(jobIdToJobConfig.isEmpty());
Assert.assertTrue(entities.isEmpty());
Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
mrJobParser.run();
Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 1);
Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) != null);
Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) != null);
Assert.assertTrue(curator.checkExists().forPath(ZKROOT) != null);
Assert.assertEquals(DATA_FROM_ZK, new String(curator.getData().forPath(ZK_JOB_PATH), "UTF-8"));
Assert.assertTrue(entities.isEmpty());
verify(client, times(1)).create(any());
}
@Test
public void testMRJobParserFetchJobConfFailButRMalive() throws Exception {
setupMock();
mockInputJobSteam("/mrjob_30784.json", JOB_URL);
mockGetConnectionWithException("/mrconf_30784.xml");
mockInputJobSteamWithException(JOB_COUNT_URL);
Assert.assertTrue(curator.checkExists().forPath(ZKROOT) == null);
Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
List<String> confKeyKeys = makeConfKeyKeys(mrRunningJobConfig);
InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json");
AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class);
List<AppInfo> appInfos = appsWrapper.getApps().getApp();
AppInfo app1 = appInfos.get(0);
Map<String, JobExecutionAPIEntity> mrJobs = null;
MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class);
when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList());
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
Map<String, JobConfig> jobIdToJobConfig = getMrJobConfigs(mrJobParser);
MRJobEntityCreationHandler mrJobEntityCreationHandler = getMrJobEntityCreationHandler(mrJobParser);
List<TaggedLogAPIEntity> entities = getMrJobEntityCreationHandlerEntities(mrJobEntityCreationHandler);
Assert.assertTrue(jobIdToJobExecutionAPIEntity.isEmpty());
Assert.assertTrue(jobIdToJobConfig.isEmpty());
Assert.assertTrue(entities.isEmpty());
Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
mrJobParser.run();
Assert.assertTrue(jobIdToJobConfig.isEmpty());
Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 1);
JobExecutionAPIEntity jobExecutionAPIEntity = jobIdToJobExecutionAPIEntity.get(JOB_ID);
Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getInternalState());
Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getCurrentState());
Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
Assert.assertTrue(entities.isEmpty());
verify(client, times(1)).create(any());
}
@Test
public void testMRJobParserFetchJobCountFailButRMaliveRetry() throws Exception {
setupMock();
reset(client);
client = mock(EagleServiceClientImpl.class);
MRRunningJobConfig.EagleServiceConfig eagleServiceConfig = mrRunningJobConfig.getEagleServiceConfig();
PowerMockito.whenNew(EagleServiceClientImpl.class).withArguments(
eagleServiceConfig.eagleServiceHost,
eagleServiceConfig.eagleServicePort,
eagleServiceConfig.username,
eagleServiceConfig.password).thenReturn(client);
when(client.create(any())).thenThrow(Exception.class).thenReturn(null);
//when(client.getJerseyClient()).thenReturn(new Client());
mockInputJobSteam("/mrjob_30784.json", JOB_URL);
mockInputJobSteamWithException(JOB_COUNT_URL);
mockGetConnection("/mrconf_30784.xml");
Assert.assertTrue(curator.checkExists().forPath(ZKROOT) == null);
Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
List<String> confKeyKeys = makeConfKeyKeys(mrRunningJobConfig);
InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json");
AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class);
List<AppInfo> appInfos = appsWrapper.getApps().getApp();
AppInfo app1 = appInfos.get(0);
Map<String, JobExecutionAPIEntity> mrJobs = null;
MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class);
when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList());
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
Map<String, JobConfig> jobIdToJobConfig = getMrJobConfigs(mrJobParser);
MRJobEntityCreationHandler mrJobEntityCreationHandler = getMrJobEntityCreationHandler(mrJobParser);
List<TaggedLogAPIEntity> entities = getMrJobEntityCreationHandlerEntities(mrJobEntityCreationHandler);
Assert.assertTrue(jobIdToJobExecutionAPIEntity.isEmpty());
Assert.assertTrue(jobIdToJobConfig.isEmpty());
Assert.assertTrue(entities.isEmpty());
Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
mrJobParser.run();
Assert.assertTrue(!jobIdToJobConfig.isEmpty());
Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 1);
JobExecutionAPIEntity jobExecutionAPIEntity = jobIdToJobExecutionAPIEntity.get(JOB_ID);
Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getInternalState());
Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getCurrentState());
Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) != null);
Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) != null);
Assert.assertTrue(entities.isEmpty());
verify(client, times(2)).create(any());
//verify(client, times(1)).getJerseyClient();
verify(client, times(1)).close();
}
private void setupMock() throws Exception {
mockStatic(Math.class);
when(Math.random()).thenReturn(0.3689680489913364d);
mockStatic(InputStreamUtils.class);
client = mock(EagleServiceClientImpl.class);
MRRunningJobConfig.EagleServiceConfig eagleServiceConfig = mrRunningJobConfig.getEagleServiceConfig();
PowerMockito.whenNew(EagleServiceClientImpl.class).withArguments(
eagleServiceConfig.eagleServiceHost,
eagleServiceConfig.eagleServicePort,
eagleServiceConfig.username,
eagleServiceConfig.password).thenReturn(client);
when(client.create(any())).thenReturn(null);
when(client.getJerseyClient()).thenReturn(new Client());
}
private Map<String, JobConfig> getMrJobConfigs(MRJobParser mrJobParser) throws NoSuchFieldException, IllegalAccessException {
Field mrJobConfigs = MRJobParser.class.getDeclaredField("mrJobConfigs");
mrJobConfigs.setAccessible(true);
return (Map<String, JobConfig>) mrJobConfigs.get(mrJobParser);
}
private Map<String, JobExecutionAPIEntity> getMrJobs(MRJobParser mrJobParser) throws NoSuchFieldException, IllegalAccessException {
Field mrJobEntityMap = MRJobParser.class.getDeclaredField("mrJobEntityMap");
mrJobEntityMap.setAccessible(true);
return (Map<String, JobExecutionAPIEntity>) mrJobEntityMap.get(mrJobParser);
}
private MRJobEntityCreationHandler getMrJobEntityCreationHandler(MRJobParser mrJobParser) throws NoSuchFieldException, IllegalAccessException {
Field mrJobEntityCreationHandler = MRJobParser.class.getDeclaredField("mrJobEntityCreationHandler");
mrJobEntityCreationHandler.setAccessible(true);
return (MRJobEntityCreationHandler) mrJobEntityCreationHandler.get(mrJobParser);
}
private List<TaggedLogAPIEntity> getMrJobEntityCreationHandlerEntities(MRJobEntityCreationHandler mrJobEntityCreationHandler) throws NoSuchFieldException, IllegalAccessException {
Field entities = MRJobEntityCreationHandler.class.getDeclaredField("entities");
entities.setAccessible(true);
return (ArrayList<TaggedLogAPIEntity>) entities.get(mrJobEntityCreationHandler);
}
private void initMrJobEntityCreationHandlerEntities(MRJobEntityCreationHandler mrJobEntityCreationHandler) throws NoSuchFieldException, IllegalAccessException {
Field entities = MRJobEntityCreationHandler.class.getDeclaredField("entities");
entities.setAccessible(true);
entities.set(mrJobEntityCreationHandler, new ArrayList<>());
}
private void initMetricsCreationListener(MRJobEntityCreationHandler mrJobEntityCreationHandler) throws IllegalAccessException, NoSuchFieldException {
Field jobMetricsListener = MRJobEntityCreationHandler.class.getDeclaredField("jobMetricsListener");
jobMetricsListener.setAccessible(true);
jobMetricsListener.set(mrJobEntityCreationHandler, new JobExecutionMetricsCreationListener());
}
private List<String> makeConfKeyKeys(MRRunningJobConfig mrRunningJobConfig) {
String[] confKeyPatternsSplit = mrRunningJobConfig.getConfig().getString("MRConfigureKeys.jobConfigKey").split(",");
List<String> confKeyKeys = new ArrayList<>(confKeyPatternsSplit.length);
for (String confKeyPattern : confKeyPatternsSplit) {
confKeyKeys.add(confKeyPattern.trim());
}
confKeyKeys.add(Constants.JobConfiguration.CASCADING_JOB);
confKeyKeys.add(Constants.JobConfiguration.HIVE_JOB);
confKeyKeys.add(Constants.JobConfiguration.PIG_JOB);
confKeyKeys.add(Constants.JobConfiguration.SCOOBI_JOB);
confKeyKeys.add("hive.optimize.skewjoin.compiletime");
confKeyKeys.add(0, mrRunningJobConfig.getConfig().getString("MRConfigureKeys.jobNameKey"));
return confKeyKeys;
}
private void mockInputJobSteam(String mockDataFilePath, String url) throws Exception {
InputStream jsonstream = this.getClass().getResourceAsStream(mockDataFilePath);
when(InputStreamUtils.getInputStream(eq(url), anyObject(), anyObject())).thenReturn(jsonstream);
}
private void mockInputJobSteamWithException(String url) throws Exception {
when(InputStreamUtils.getInputStream(eq(url), anyObject(), anyObject())).thenThrow(new Exception());
}
private void mockGetConnectionWithException(String mockDataFilePath) throws Exception {
InputStream jsonstream = this.getClass().getResourceAsStream(mockDataFilePath);
mockStatic(URLConnectionUtils.class);
URLConnection connection = mock(URLConnection.class);
when(connection.getInputStream()).thenReturn(jsonstream);
when(URLConnectionUtils.getConnection(JOB_CONF_URL)).thenThrow(new Exception());
}
private void mockGetConnection(String mockDataFilePath) throws Exception {
InputStream jsonstream = this.getClass().getResourceAsStream(mockDataFilePath);
mockStatic(URLConnectionUtils.class);
URLConnection connection = mock(URLConnection.class);
when(connection.getInputStream()).thenReturn(jsonstream);
when(URLConnectionUtils.getConnection(JOB_CONF_URL)).thenReturn(connection);
}
}