blob: 6eed32d189e38449ce545c13b3447cea8e7e1568 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.test.ZKXTestCase;
import org.apache.oozie.util.ConfigUtils;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.ZKUtils;
public class TestZKJobsConcurrencyService extends ZKXTestCase {
@Override
protected void setUp() throws Exception {
super.setUp();
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
}
public void testRegisterUnregister() throws Exception {
assertEquals(0, ZKUtils.getUsers().size());
ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
try {
zkjcs.init(Services.get());
assertEquals(1, ZKUtils.getUsers().size());
assertEquals(zkjcs, ZKUtils.getUsers().iterator().next());
zkjcs.destroy();
assertEquals(0, ZKUtils.getUsers().size());
}
finally {
zkjcs.destroy();
}
}
public void testIsLeader() throws Exception {
ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
// We'll use some DummyZKXOozies here to pretend to be other Oozie servers. It chooses randomly so we can't check that a
// specific server gets chosen.
DummyZKOozie dummyOozie = null;
DummyZKOozie dummyOozie2 = null;
try {
zkjcs.init(Services.get());
dummyOozie = new DummyZKOozie("a", "http://blah", true);
dummyOozie2 = new DummyZKOozie("b", "http://blah", true);
sleep(3 * 1000);
if (zkjcs.isLeader()) {
assertFalse(dummyOozie.isLeader());
assertFalse(dummyOozie2.isLeader());
zkjcs.destroy();
sleep(3 * 1000);
if (dummyOozie.isLeader()) {
assertFalse(dummyOozie2.isLeader());
} else if (dummyOozie2.isLeader()) {
assertFalse(dummyOozie.isLeader());
} else {
fail("No leader");
}
} else if (dummyOozie.isLeader()) {
assertFalse(zkjcs.isLeader());
assertFalse(dummyOozie2.isLeader());
dummyOozie.teardown();
sleep(3 * 1000);
if (zkjcs.isLeader()) {
assertFalse(dummyOozie2.isLeader());
} else if (dummyOozie2.isLeader()) {
assertFalse(zkjcs.isLeader());
} else {
fail("No leader");
}
} else if (dummyOozie2.isLeader()) {
assertFalse(zkjcs.isLeader());
assertFalse(dummyOozie.isLeader());
dummyOozie2.teardown();
sleep(3 * 1000);
if (zkjcs.isLeader()) {
assertFalse(dummyOozie.isLeader());
} else if (dummyOozie.isLeader()) {
assertFalse(zkjcs.isLeader());
} else {
fail("No leader");
}
} else {
fail("No leader");
}
}
finally {
zkjcs.destroy();
if (dummyOozie != null) {
dummyOozie.teardown();
}
if (dummyOozie2 != null) {
dummyOozie2.teardown();
}
}
}
public void testIsJobIdForThisServer() throws Exception {
ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
// We'll use some DummyZKXOozies here to pretend to be other Oozie servers that will influence isJobIdForThisServer()
// once they are running in that the indecies of the job ids will cause each job id to belong to different Oozie "servers"
DummyZKOozie dummyOozie = null;
DummyZKOozie dummyOozie2 = null;
try {
dummyOozie = new DummyZKOozie("a", "http://blah");
zkjcs.init(Services.get());
assertFalse(zkjcs.isJobIdForThisServer("0000000-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000001-130521183438837-oozie-rkan-W"));
assertFalse(zkjcs.isJobIdForThisServer("0000002-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000003-130521183438837-oozie-rkan-W"));
assertFalse(zkjcs.isJobIdForThisServer("0000004-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000005-130521183438837-oozie-rkan-W"));
assertFalse(zkjcs.isJobIdForThisServer("0000006-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("blah"));
dummyOozie2 = new DummyZKOozie("b", "http://blah");
assertFalse(zkjcs.isJobIdForThisServer("0000000-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000001-130521183438837-oozie-rkan-W"));
assertFalse(zkjcs.isJobIdForThisServer("0000002-130521183438837-oozie-rkan-W"));
assertFalse(zkjcs.isJobIdForThisServer("0000003-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000004-130521183438837-oozie-rkan-W"));
assertFalse(zkjcs.isJobIdForThisServer("0000005-130521183438837-oozie-rkan-W"));
assertFalse(zkjcs.isJobIdForThisServer("0000006-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("blah"));
dummyOozie.teardown();
assertTrue(zkjcs.isJobIdForThisServer("0000000-130521183438837-oozie-rkan-W"));
assertFalse(zkjcs.isJobIdForThisServer("0000001-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000002-130521183438837-oozie-rkan-W"));
assertFalse(zkjcs.isJobIdForThisServer("0000003-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000004-130521183438837-oozie-rkan-W"));
assertFalse(zkjcs.isJobIdForThisServer("0000005-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000006-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("blah"));
dummyOozie2.teardown();
assertTrue(zkjcs.isJobIdForThisServer("0000000-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000001-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000002-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000003-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000004-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000005-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000006-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("blah"));
}
finally {
zkjcs.destroy();
if (dummyOozie != null) {
dummyOozie.teardown();
}
if (dummyOozie2 != null) {
dummyOozie2.teardown();
}
}
}
public void testIsJobIdForThisServerBadZk() throws Exception {
ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
try {
zkjcs.init(Services.get());
assertTrue(zkjcs.isJobIdForThisServer("0000000-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000001-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000002-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000003-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000004-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000005-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("0000006-130521183438837-oozie-rkan-W"));
assertTrue(zkjcs.isJobIdForThisServer("blah"));
zkjcs.zk.getClient().close(); // simulating zookeeper problem
sleep(1000); // Sleep to allow ZKUtils ServiceCache to update
try {
zkjcs.isJobIdForThisServer("0000000-130521183438837-oozie-rkan-W");
fail("Expected ServletException");
} catch (ServiceException e) {
assertEquals(ErrorCode.E1700, e.getErrorCode());
}
}
finally {
zkjcs.destroy();
}
}
public void testGetJobIdsForThisServer() throws Exception {
ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
// We'll use some DummyZKXOozies here to pretend to be other Oozie servers that will influence getJobIdsForThisServer()
// once they are running in that the indecies of the job ids will cause each job id to belong to different Oozie "servers"
DummyZKOozie dummyOozie = null;
DummyZKOozie dummyOozie2 = null;
try {
List<String> ids = new ArrayList<String>();
ids.add("0000000-130521183438837-oozie-rkan-W");
ids.add("0000001-130521183438837-oozie-rkan-W");
ids.add("0000002-130521183438837-oozie-rkan-W");
ids.add("0000003-130521183438837-oozie-rkan-W");
ids.add("0000004-130521183438837-oozie-rkan-W");
ids.add("0000005-130521183438837-oozie-rkan-W");
ids.add("0000006-130521183438837-oozie-rkan-W");
ids.add("blah");
dummyOozie = new DummyZKOozie("a", "http://blah");
zkjcs.init(Services.get());
List<String> ids2 = zkjcs.getJobIdsForThisServer(ids);
List<String> ids3 = new ArrayList<String>();
ids3.add("0000001-130521183438837-oozie-rkan-W");
ids3.add("0000003-130521183438837-oozie-rkan-W");
ids3.add("0000005-130521183438837-oozie-rkan-W");
ids3.add("blah");
assertEquals(4, ids2.size());
assertTrue(ids2.containsAll(ids3));
dummyOozie2 = new DummyZKOozie("b", "http://blah");
ids2 = zkjcs.getJobIdsForThisServer(ids);
ids3 = new ArrayList<String>();
ids3.add("0000001-130521183438837-oozie-rkan-W");
ids3.add("0000004-130521183438837-oozie-rkan-W");
ids3.add("blah");
assertEquals(3, ids2.size());
assertTrue(ids2.containsAll(ids3));
dummyOozie.teardown();
ids2 = zkjcs.getJobIdsForThisServer(ids);
ids3 = new ArrayList<String>();
ids3.add("0000000-130521183438837-oozie-rkan-W");
ids3.add("0000002-130521183438837-oozie-rkan-W");
ids3.add("0000004-130521183438837-oozie-rkan-W");
ids3.add("0000006-130521183438837-oozie-rkan-W");
ids3.add("blah");
assertEquals(5, ids2.size());
assertTrue(ids2.containsAll(ids3));
dummyOozie2.teardown();
ids2 = zkjcs.getJobIdsForThisServer(ids);
assertEquals(8, ids2.size());
assertTrue(ids2.containsAll(ids));
}
finally {
zkjcs.destroy();
if (dummyOozie != null) {
dummyOozie.teardown();
}
if (dummyOozie2 != null) {
dummyOozie2.teardown();
}
}
}
public void testGetJobIdsForThisServerBadZk() throws Exception {
ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
try {
zkjcs.init(Services.get());
List<String> ids = new ArrayList<String>();
ids.add("0000000-130521183438837-oozie-rkan-W");
ids.add("0000001-130521183438837-oozie-rkan-W");
ids.add("0000002-130521183438837-oozie-rkan-W");
ids.add("0000003-130521183438837-oozie-rkan-W");
ids.add("0000004-130521183438837-oozie-rkan-W");
ids.add("0000005-130521183438837-oozie-rkan-W");
ids.add("0000006-130521183438837-oozie-rkan-W");
ids.add("blah");
List<String> ids2 = zkjcs.getJobIdsForThisServer(ids);
assertEquals(8, ids2.size());
assertTrue(ids2.containsAll(ids));
zkjcs.zk.getClient().close(); // simulating zookeeper problem
sleep(1000); // Sleep to allow ZKUtils ServiceCache to update
try {
zkjcs.getJobIdsForThisServer(ids);
fail("Expected ServletException");
} catch (ServiceException e) {
assertEquals(ErrorCode.E1700, e.getErrorCode());
}
}
finally {
zkjcs.destroy();
}
}
public void testGetServerUrls() throws Exception {
ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
// We'll use some DummyZKXOozies here to pretend to be other Oozie servers that will influence getServerUrls()
// once they are running in that the there will be other Oozie "server" urls to return
DummyZKOozie dummyOozie = null;
DummyZKOozie dummyOozie2 = null;
try {
zkjcs.init(Services.get());
Map<String, String> map = zkjcs.getServerUrls();
assertEquals(1, map.size());
assertEquals(ZK_ID, map.keySet().iterator().next());
assertEquals(ConfigUtils.getOozieURL(false), map.get(ZK_ID));
dummyOozie = new DummyZKOozie("0000", "http://blah1");
map = zkjcs.getServerUrls();
assertEquals(2, map.size());
assertEquals(ZK_ID, map.keySet().iterator().next());
assertEquals(ConfigUtils.getOozieURL(false), map.get(ZK_ID));
assertEquals(ZK_ID, map.keySet().iterator().next());
assertEquals("http://blah1", map.get("0000"));
dummyOozie2 = new DummyZKOozie("z", "http://blah2");
map = zkjcs.getServerUrls();
assertEquals(3, map.size());
assertEquals(ZK_ID, map.keySet().iterator().next());
assertEquals(ConfigUtils.getOozieURL(false), map.get(ZK_ID));
assertEquals(ZK_ID, map.keySet().iterator().next());
assertEquals("http://blah1", map.get("0000"));
assertEquals(ZK_ID, map.keySet().iterator().next());
assertEquals("http://blah2", map.get("z"));
dummyOozie.teardown();
map = zkjcs.getServerUrls();
assertEquals(2, map.size());
assertEquals(ZK_ID, map.keySet().iterator().next());
assertEquals(ConfigUtils.getOozieURL(false), map.get(ZK_ID));
assertEquals(ZK_ID, map.keySet().iterator().next());
assertEquals("http://blah2", map.get("z"));
dummyOozie2.teardown();
map = zkjcs.getServerUrls();
assertEquals(1, map.size());
assertEquals(ZK_ID, map.keySet().iterator().next());
assertEquals(ConfigUtils.getOozieURL(false), map.get(ZK_ID));
}
finally {
zkjcs.destroy();
if (dummyOozie != null) {
dummyOozie.teardown();
}
if (dummyOozie2 != null) {
dummyOozie2.teardown();
}
}
}
public void testisAllServerRequest() throws Exception {
ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
try {
zkjcs.init(Services.get());
assertTrue(zkjcs.isAllServerRequest(null));
Map<String, String[]> param = new HashMap<String, String[]>();
assertTrue(zkjcs.isAllServerRequest(param));
param.put(RestConstants.ALL_SERVER_REQUEST, new String[] { "test" });
assertTrue(zkjcs.isAllServerRequest(param));
param.put(RestConstants.ALL_SERVER_REQUEST, new String[] { "true" });
assertTrue(zkjcs.isAllServerRequest(param));
param.put(RestConstants.ALL_SERVER_REQUEST, new String[] { "false" });
assertFalse(zkjcs.isAllServerRequest(param));
}
finally {
zkjcs.destroy();
}
}
public void testInstrumentation() throws Exception {
ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
// We'll use some DummyZKXOozies here to pretend to be other Oozie servers that will influence the instrumentation
// once they are running in that the there will be other Oozie "servers"
DummyZKOozie dummyOozie = null;
DummyZKOozie dummyOozie2 = null;
Instrumentation instr = new Instrumentation();
try {
zkjcs.init(Services.get());
zkjcs.instrument(instr);
String servers = ZK_ID + "=" + ConfigUtils.getOozieURL(false);
assertEquals(servers, instr.getVariables().get("oozie").get("servers").getValue());
dummyOozie = new DummyZKOozie("0000", "http://blah1");
servers = ZK_ID + "=" + ConfigUtils.getOozieURL(false) + ",0000=http://blah1";
assertEquals(servers, instr.getVariables().get("oozie").get("servers").getValue());
dummyOozie2 = new DummyZKOozie("z", "http://blah2");
servers = ZK_ID + "=" + ConfigUtils.getOozieURL(false) + ",0000=http://blah1" + ",z=http://blah2";
assertEquals(servers, instr.getVariables().get("oozie").get("servers").getValue());
dummyOozie.teardown();
servers = ZK_ID + "=" + ConfigUtils.getOozieURL(false) + ",z=http://blah2";
assertEquals(servers, instr.getVariables().get("oozie").get("servers").getValue());
dummyOozie2.teardown();
servers = ZK_ID + "=" + ConfigUtils.getOozieURL(false);
assertEquals(servers, instr.getVariables().get("oozie").get("servers").getValue());
} finally {
zkjcs.destroy();
if (dummyOozie != null) {
dummyOozie.teardown();
}
if (dummyOozie2 != null) {
dummyOozie2.teardown();
}
}
}
}