blob: eb48ff66d3c38492a41b9bb2faa1612f9c9e1f7b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.jpm.mr.running;
import com.typesafe.config.ConfigFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
import org.apache.zookeeper.CreateMode;
import org.junit.*;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
@RunWith(PowerMockRunner.class)
@PrepareForTest({MRRunningJobManager.class, RunningJobManager.class, LoggerFactory.class})
@PowerMockIgnore({"javax.*"})
public class MRRunningJobManagerTest {
private static TestingServer zk;
private static com.typesafe.config.Config config = ConfigFactory.load();
private static CuratorFramework curator;
private static final String SHARE_RESOURCES = "/apps/mr/running/sandbox/yarnAppId/jobId";
private static final int QTY = 5;
private static final int REPETITIONS = QTY * 10;
private static MRRunningJobConfig.EndpointConfig endpointConfig;
private static MRRunningJobConfig.ZKStateConfig zkStateConfig;
private static org.slf4j.Logger log = mock(org.slf4j.Logger.class);
private static final int BUFFER_SIZE = 4096;
private static final String LOCKS_BASE_PATH = "/locks";
@BeforeClass
public static void setupZookeeper() throws Exception {
zk = new TestingServer();
curator = CuratorFrameworkFactory.newClient(zk.getConnectString(), new ExponentialBackoffRetry(1000, 3));
curator.start();
MRRunningJobConfig mrRunningJobConfig = MRRunningJobConfig.newInstance(config);
zkStateConfig = mrRunningJobConfig.getZkStateConfig();
zkStateConfig.zkQuorum = zk.getConnectString();
endpointConfig = mrRunningJobConfig.getEndpointConfig();
mockStatic(LoggerFactory.class);
when(LoggerFactory.getLogger(any(Class.class))).thenReturn(log);
}
@AfterClass
public static void teardownZookeeper() throws Exception {
CloseableUtils.closeQuietly(curator);
CloseableUtils.closeQuietly(zk);
}
@Before
public void createPath() throws Exception {
if(curator.checkExists().forPath(SHARE_RESOURCES) == null) {
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(SHARE_RESOURCES);
}
}
@After
public void cleanPath() throws Exception {
if (curator.checkExists().forPath(SHARE_RESOURCES) != null) {
curator.delete().deletingChildrenIfNeeded().forPath(SHARE_RESOURCES);
}
if (curator.checkExists().forPath(LOCKS_BASE_PATH) != null) {
curator.delete().guaranteed().deletingChildrenIfNeeded().forPath(LOCKS_BASE_PATH);
}
}
@Test
@Ignore
public void testMRRunningJobManagerDelWithLock() throws Exception {
Assert.assertTrue(curator.checkExists().forPath(SHARE_RESOURCES) != null);
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i) {
Callable<Void> task = () -> {
try {
MRRunningJobManager mrRunningJobManager = new MRRunningJobManager(zkStateConfig);
for (int j = 0; j < REPETITIONS; ++j) {
mrRunningJobManager.delete("yarnAppId", "jobId");
}
} catch (Exception e) {
// log or do something
}
return null;
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
Assert.assertTrue(curator.checkExists().forPath(SHARE_RESOURCES) == null);
verify(log, never()).error(anyString(), anyString(), anyString(), anyString(), any(Throwable.class));
verify(log, never()).error(anyString(), anyString(), anyString());
verify(log, never()).error(anyString(), any(Throwable.class));
}
@Test
@Ignore
public void testMRRunningJobManagerRecoverYarnAppWithLock() throws Exception {
Assert.assertTrue(curator.checkExists().forPath(SHARE_RESOURCES) != null);
curator.setData().forPath(SHARE_RESOURCES, generateZkSetData());
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i) {
Callable<Void> task = () -> {
try {
MRRunningJobManager mrRunningJobManager = new MRRunningJobManager(zkStateConfig);
for (int j = 0; j < REPETITIONS; ++j) {
if(j % 3 == 0) {
mrRunningJobManager.delete("yarnAppId", "jobId");
} else {
mrRunningJobManager.recoverYarnApp("yarnAppId");
}
}
} catch (Exception e) {
// log or do something
}
return null;
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
verify(log, never()).error(anyString(), any(Throwable.class));
}
@Test
public void testMRRunningJobManagerRecoverWithLock() throws Exception {
Assert.assertTrue(curator.checkExists().forPath(SHARE_RESOURCES) != null);
curator.setData().forPath(SHARE_RESOURCES, generateZkSetData());
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i) {
Callable<Void> task = () -> {
try {
MRRunningJobManager mrRunningJobManager = new MRRunningJobManager(zkStateConfig);
for (int j = 0; j < REPETITIONS; ++j) {
if(j % 3 == 0) {
mrRunningJobManager.delete("yarnAppId", "jobId");
} else {
mrRunningJobManager.recover();
}
}
} catch (Exception e) {
// log or do something
}
return null;
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
verify(log, never()).error(anyString(), any(Throwable.class));
}
private byte[] generateZkSetData() throws IOException {
InputStream jsonstream = this.getClass().getResourceAsStream("/jobInfo_805.json");
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
byte[] data = new byte[BUFFER_SIZE];
int count = -1;
while((count = jsonstream.read(data, 0, BUFFER_SIZE)) != -1) {
outputStream.write(data, 0, count);
}
data = null;
return outputStream.toByteArray();
}
}