blob: f6c01c4ec2b120ba2b601a56ed6fe435a022d0a3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.recipes.leader;
import java.io.IOException;
import org.apache.curator.test.Compatibility;
import org.apache.curator.test.TestingZooKeeperMain;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerShutdownHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A connection factory that will behave like the NIOServerCnxnFactory except that
* it will unexpectedly close the connection right after the <b>first</b> znode has
* been created in Zookeeper.
* Subsequent create operations will succeed.
*/
public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory {
public static final String CHAOS_ZNODE = "/mylock";
public static final String CHAOS_ZNODE_PREFIX = CHAOS_ZNODE + "/";
private static final Logger log = LoggerFactory.getLogger(ChaosMonkeyCnxnFactory.class);
/* How long after the first error, connections are rejected */
public static final long LOCKOUT_DURATION_MS = 6000;
@Override
public void startup(ZooKeeperServer zks) throws IOException, InterruptedException {
super.startup(new ChaosMonkeyZookeeperServer(zks));
}
public static class ChaosMonkeyZookeeperServer extends ZooKeeperServer {
private final ZooKeeperServer zks;
private long firstError = 0;
public ChaosMonkeyZookeeperServer(ZooKeeperServer zks) {
this.zks = zks;
setTxnLogFactory(zks.getTxnLogFactory());
setTickTime(zks.getTickTime());
setMinSessionTimeout(zks.getMinSessionTimeout());
setMaxSessionTimeout(zks.getMaxSessionTimeout());
}
@Override
public void startup() {
super.startup();
if (zks instanceof TestingZooKeeperMain.TestZooKeeperServer) {
((TestingZooKeeperMain.TestZooKeeperServer) zks).noteStartup();
}
}
@Override
public void submitRequest(Request si) {
long remaining = firstError != 0 ? LOCKOUT_DURATION_MS - (System.currentTimeMillis() - firstError) : 0;
if (si.type != ZooDefs.OpCode.createSession
&& si.type != ZooDefs.OpCode.sync
&& si.type != ZooDefs.OpCode.ping
&& firstError != 0
&& remaining > 0) {
log.debug("Rejected : {}", si);
// Still reject request
log.debug("Still not ready for " + remaining + "ms");
Compatibility.serverCnxnClose(si.cnxn);
return;
}
// Submit the request to the legacy Zookeeper server
log.debug("Applied : {}", si);
super.submitRequest(si);
// Raise an error if a lock is created
if ((si.type == ZooDefs.OpCode.create) || (si.type == ZooDefs.OpCode.create2)) {
try {
CreateRequest createRequest = si.readRequestRecord(CreateRequest::new);
if (createRequest.getPath().startsWith(CHAOS_ZNODE_PREFIX) && firstError == 0) {
firstError = System.currentTimeMillis();
// The znode has been created, close the connection and don't tell it to client
log.warn("Closing connection right after " + createRequest.getPath() + " creation");
Compatibility.serverCnxnClose(si.cnxn);
}
} catch (Exception e) {
// Should not happen
Compatibility.serverCnxnClose(si.cnxn);
}
}
}
@Override
public ZooKeeperServerShutdownHandler getZkShutdownHandler() {
return zks.getZkShutdownHandler();
}
}
}