blob: 550bfd78ba584c5559623fa2d4788c2af95160ec [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HealthMonitor.State;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.ZooKeeperServer;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Harness for starting two dummy ZK FailoverControllers, associated with
* DummyHAServices. This harness starts two such ZKFCs, designated by
* indexes 0 and 1, and provides utilities for building tests around them.
*/
public class MiniZKFCCluster {
private final TestContext ctx;
private final ZooKeeperServer zks;
private List<DummyHAService> svcs;
private DummyZKFCThread thrs[];
private Configuration conf;
private DummySharedResource sharedResource = new DummySharedResource();
private static final Logger LOG =
LoggerFactory.getLogger(MiniZKFCCluster.class);
public MiniZKFCCluster(Configuration conf, ZooKeeperServer zks) {
this.conf = conf;
// Fast check interval so tests run faster
conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
svcs = new ArrayList<DummyHAService>(2);
// remove any existing instances we are keeping track of
DummyHAService.instances.clear();
for (int i = 0; i < 2; i++) {
addSvcs(svcs, i);
}
this.ctx = new TestContext();
this.zks = zks;
}
private void addSvcs(List<DummyHAService> svcs, int i) {
svcs.add(new DummyHAService(HAServiceState.INITIALIZING, new InetSocketAddress("svc" + (i + 1),
1234)));
svcs.get(i).setSharedResource(sharedResource);
}
/**
* Set up two services and their failover controllers. svc1 is started
* first, so that it enters ACTIVE state, and then svc2 is started,
* which enters STANDBY
*/
public void start() throws Exception {
start(2);
}
/**
* Set up the specified number of services and their failover controllers. svc1 is
* started first, so that it enters ACTIVE state, and then svc2...svcN is started, which enters
* STANDBY.
* <p>
* Adds any extra svc needed beyond the first two before starting the rest of the cluster.
* @param count number of zkfcs to start
*/
public void start(int count) throws Exception {
// setup the expected number of zkfcs, if we need to add more. This seemed the least invasive
// way to add the services - otherwise its a large test rewrite or changing a lot of assumptions
if (count > 2) {
for (int i = 2; i < count; i++) {
addSvcs(svcs, i);
}
}
// Format the base dir, should succeed
thrs = new DummyZKFCThread[count];
thrs[0] = new DummyZKFCThread(ctx, svcs.get(0));
assertEquals(0, thrs[0].zkfc.run(new String[]{"-formatZK"}));
ctx.addThread(thrs[0]);
thrs[0].start();
LOG.info("Waiting for svc0 to enter active state");
waitForHAState(0, HAServiceState.ACTIVE);
// add the remaining zkfc
for (int i = 1; i < count; i++) {
LOG.info("Adding svc" + i);
thrs[i] = new DummyZKFCThread(ctx, svcs.get(i));
thrs[i].start();
waitForHAState(i, HAServiceState.STANDBY);
}
}
/**
* Stop the services.
* @throws Exception if either of the services had encountered a fatal error
*/
public void stop() throws Exception {
if (thrs != null) {
for (DummyZKFCThread thr : thrs) {
if (thr != null) {
thr.interrupt();
}
}
}
if (ctx != null) {
ctx.stop();
}
sharedResource.assertNoViolations();
}
/**
* @return the TestContext implementation used internally. This allows more
* threads to be added to the context, etc.
*/
public TestContext getTestContext() {
return ctx;
}
public DummyHAService getService(int i) {
return svcs.get(i);
}
public ActiveStandbyElector getElector(int i) {
return thrs[i].zkfc.getElectorForTests();
}
public DummyZKFC getZkfc(int i) {
return thrs[i].zkfc;
}
public void setHealthy(int idx, boolean healthy) {
svcs.get(idx).isHealthy = healthy;
}
public void setFailToBecomeActive(int idx, boolean doFail) {
svcs.get(idx).failToBecomeActive = doFail;
}
public void setFailToBecomeStandby(int idx, boolean doFail) {
svcs.get(idx).failToBecomeStandby = doFail;
}
public void setFailToFence(int idx, boolean doFail) {
svcs.get(idx).failToFence = doFail;
}
public void setUnreachable(int idx, boolean unreachable) {
svcs.get(idx).actUnreachable = unreachable;
}
public void setFailToBecomeObserver(int idx, boolean doFail) {
svcs.get(idx).failToBecomeObserver = doFail;
}
/**
* Wait for the given HA service to enter the given HA state.
* This is based on the state of ZKFC, not the state of HA service.
* There could be difference between the two. For example,
* When the service becomes unhealthy, ZKFC will quit ZK election and
* transition to HAServiceState.INITIALIZING and remain in that state
* until the service becomes healthy.
*/
public void waitForHAState(int idx, HAServiceState state)
throws Exception {
DummyZKFC svc = getZkfc(idx);
while (svc.getServiceState() != state) {
ctx.checkException();
Thread.sleep(50);
}
}
/**
* Wait for the ZKFC to be notified of a change in health state.
*/
public void waitForHealthState(int idx, State state)
throws Exception {
ZKFCTestUtil.waitForHealthState(thrs[idx].zkfc, state, ctx);
}
/**
* Wait for the given elector to enter the given elector state.
* @param idx the service index (0 or 1)
* @param state the state to wait for
* @throws Exception if it times out, or an exception occurs on one
* of the ZKFC threads while waiting.
*/
public void waitForElectorState(int idx,
ActiveStandbyElector.State state) throws Exception {
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
getElector(idx), state);
}
/**
* Expire the ZK session of the given service. This requires
* (and asserts) that the given service be the current active.
* @throws NoNodeException if no service holds the lock
*/
public void expireActiveLockHolder(int idx)
throws NoNodeException {
Stat stat = new Stat();
byte[] data = zks.getZKDatabase().getData(
DummyZKFC.LOCK_ZNODE, stat, null);
assertArrayEquals(Ints.toByteArray(svcs.get(idx).index), data);
long session = stat.getEphemeralOwner();
LOG.info("Expiring svc " + idx + "'s zookeeper session " + session);
zks.closeSession(session);
}
/**
* Wait for the given HA service to become the active lock holder.
* If the passed svc is null, waits for there to be no active
* lock holder.
*/
public void waitForActiveLockHolder(Integer idx)
throws Exception {
DummyHAService svc = idx == null ? null : svcs.get(idx);
ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks,
DummyZKFC.SCOPED_PARENT_ZNODE,
(idx == null) ? null : Ints.toByteArray(svc.index));
}
/**
* Expires the ZK session associated with service 'fromIdx', and waits
* until service 'toIdx' takes over.
* @throws Exception if the target service does not become active
*/
public void expireAndVerifyFailover(int fromIdx, int toIdx)
throws Exception {
Preconditions.checkArgument(fromIdx != toIdx);
getElector(fromIdx).preventSessionReestablishmentForTests();
try {
expireActiveLockHolder(fromIdx);
waitForHAState(fromIdx, HAServiceState.STANDBY);
waitForHAState(toIdx, HAServiceState.ACTIVE);
} finally {
getElector(fromIdx).allowSessionReestablishmentForTests();
}
}
/**
* Test-thread which runs a ZK Failover Controller corresponding
* to a given dummy service.
*/
private class DummyZKFCThread extends TestingThread {
private final DummyZKFC zkfc;
public DummyZKFCThread(TestContext ctx, DummyHAService svc) {
super(ctx);
this.zkfc = new DummyZKFC(conf, svc);
}
@Override
public void doWork() throws Exception {
try {
assertEquals(0, zkfc.run(new String[0]));
} catch (InterruptedException ie) {
// Interrupted by main thread, that's OK.
}
}
}
static class DummyZKFC extends ZKFailoverController {
private static final String DUMMY_CLUSTER = "dummy-cluster";
public static final String SCOPED_PARENT_ZNODE =
ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT + "/" +
DUMMY_CLUSTER;
private static final String LOCK_ZNODE =
SCOPED_PARENT_ZNODE + "/" + ActiveStandbyElector.LOCK_FILENAME;
private final DummyHAService localTarget;
public DummyZKFC(Configuration conf, DummyHAService localTarget) {
super(conf, localTarget);
this.localTarget = localTarget;
}
@Override
protected byte[] targetToData(HAServiceTarget target) {
return Ints.toByteArray(((DummyHAService)target).index);
}
@Override
protected HAServiceTarget dataToTarget(byte[] data) {
int index = Ints.fromByteArray(data);
return DummyHAService.getInstance(index);
}
@Override
protected void loginAsFCUser() throws IOException {
}
@Override
protected String getScopeInsideParentNode() {
return DUMMY_CLUSTER;
}
@Override
protected void checkRpcAdminAccess() throws AccessControlException {
}
@Override
protected InetSocketAddress getRpcAddressToBindTo() {
return new InetSocketAddress(0);
}
@Override
protected void initRPC() throws IOException {
super.initRPC();
localTarget.zkfcProxy = this.getRpcServerForTests();
}
@Override
protected PolicyProvider getPolicyProvider() {
return null;
}
@Override
protected List<HAServiceTarget> getAllOtherNodes() {
List<HAServiceTarget> services = new ArrayList<HAServiceTarget>(
DummyHAService.instances.size());
for (DummyHAService service : DummyHAService.instances) {
if (service != this.localTarget) {
services.add(service);
}
}
return services;
}
}
}