blob: ec8bd4d0c07281db862fdf788acedf981ee74781 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import com.datastax.driver.core.NettyOptions;
import com.datastax.oss.simulacron.common.cluster.ClusterSpec;
import com.datastax.oss.simulacron.server.BoundCluster;
import com.datastax.oss.simulacron.server.BoundNode;
import com.datastax.oss.simulacron.server.NodePerPortResolver;
import com.datastax.oss.simulacron.server.Server;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.util.Modules;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.codec.BodyCodec;
import io.vertx.junit5.VertxTestContext;
import org.apache.cassandra.sidecar.routes.HealthCheck;
import org.apache.cassandra.sidecar.routes.HealthService;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Longer run and more intensive tests for the HealthService and HealthCheck
*/
@DisplayName("Health Service Integration Tests")
public class HealthServiceIntegrationTest
{
private static final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("HealthServiceTest-%d")
.build();
private static final HashedWheelTimer sharedHWT = new HashedWheelTimer(threadFactory);
private static final EventLoopGroup sharedEventLoopGroup = new NioEventLoopGroup(0, threadFactory);
private static final Logger logger = LoggerFactory.getLogger(HealthServiceIntegrationTest.class);
private static final NettyOptions shared = new NettyOptions()
{
public EventLoopGroup eventLoopGroup(ThreadFactory threadFactory)
{
return sharedEventLoopGroup;
}
public void onClusterClose(EventLoopGroup eventLoopGroup)
{
}
public Timer timer(ThreadFactory threadFactory)
{
return sharedHWT;
}
public void onClusterClose(Timer timer)
{
}
};
private Vertx vertx;
private int port;
private List<CQLSession> sessions = new LinkedList<>();
private Injector injector;
@BeforeEach
void setUp() throws InterruptedException
{
AtomicBoolean failedToListen = new AtomicBoolean(false);
do
{
injector = Guice.createInjector(Modules.override(new MainModule())
.with(new IntegrationTestModule(1, sessions)));
vertx = injector.getInstance(Vertx.class);
HttpServer httpServer = injector.getInstance(HttpServer.class);
Configuration config = injector.getInstance(Configuration.class);
port = config.getPort();
CountDownLatch waitLatch = new CountDownLatch(1);
httpServer.listen(port, res ->
{
if (res.succeeded())
{
logger.info("Succeeded to listen on port " + port);
}
else
{
logger.error("Failed to listen on port " + port + " " + res.cause());
failedToListen.set(true);
}
waitLatch.countDown();
});
if (waitLatch.await(60, TimeUnit.SECONDS))
logger.info("Listen complete before timeout.");
else
logger.error("Listen complete timed out.");
if (failedToListen.get())
closeClusters();
} while(failedToListen.get());
}
@AfterEach
void tearDown() throws InterruptedException
{
CountDownLatch waitLatch = new CountDownLatch(1);
vertx.close(res -> waitLatch.countDown());
if (waitLatch.await(60, TimeUnit.SECONDS))
logger.info("Close complete before timeout.");
else
logger.error("Close timed out.");
}
@AfterEach
public void closeClusters()
{
for (CQLSession session : sessions)
session.close();
sessions.clear();
}
/**
* This test has a race condition that can result in test failure. Be sure to wait long enough for the server
* to register as up.
* See CASSANDRA-15615
*/
@DisplayName("100 node cluster stopping, then starting")
@Test
public void testDownHost() throws InterruptedException
{
int nodeCount = 100;
try (Server server = Server.builder()
.withMultipleNodesPerIp(true)
.withAddressResolver(new NodePerPortResolver(new byte[]{ 127, 0, 0, 1 }, 49152))
.build())
{
ClusterSpec cluster = ClusterSpec.builder()
.withNodes(nodeCount)
.build();
BoundCluster bCluster = server.register(cluster);
Set<BoundNode> downNodes = new HashSet<>();
Map<BoundNode, HealthCheck> checks = new HashMap<>();
logger.info("Create a health check per node");
for (BoundNode node : bCluster.getNodes())
checks.put(node, healthCheckFor(node, shared));
logger.info("verify all nodes marked as up");
for (BoundNode node : bCluster.getNodes())
assertTrue(checks.get(node).get());
logger.info("shut down nodes one at a time, and verify we get correct response on all HealthChecks");
for (int i = 0; downNodes.size() < nodeCount; i++)
{
for (BoundNode node : bCluster.getNodes())
assertEquals(checks.get(node).get(), !downNodes.contains(node));
bCluster.node(i).stop();
downNodes.add(bCluster.node(i));
}
logger.info("all hosts should be down");
for (BoundNode node : bCluster.getNodes())
assertFalse(checks.get(node).get());
logger.info("Starting nodes back up");
int i;
for (i = 0; downNodes.size() > 0; i++)
{
bCluster.node(i).start();
downNodes.remove(bCluster.node(i));
}
logger.info("Nodes started back up: " + i);
logger.info("verify all nodes marked as up");
long start = System.currentTimeMillis();
int checkNumber = 0;
for (BoundNode node : bCluster.getNodes())
{
while ((System.currentTimeMillis() - start) < 20000 && !checks.get(node).get())
Thread.sleep(250);
logger.info("Started node " + checkNumber);
assertTrue(checks.get(node).get(), "Failed on node " + checkNumber);
checkNumber++;
}
}
}
@DisplayName("Down on startup, then comes up")
@Test
public void testDownHostTurnsOn() throws Throwable
{
VertxTestContext testContext = new VertxTestContext();
BoundCluster bc = injector.getInstance(BoundCluster.class);
BoundNode node = bc.node(0);
HealthCheck check = injector.getInstance(HealthCheck.class);
HealthService service = injector.getInstance(HealthService.class);
Server server = injector.getInstance(Server.class);
try
{
WebClient client = WebClient.create(vertx);
long start = System.currentTimeMillis();
client.get(port, "localhost", "/api/v1/__health")
.as(BodyCodec.string())
.send(testContext.succeeding(response -> testContext.verify(() ->
{
assertEquals(503, response.statusCode());
node.start();
while ((System.currentTimeMillis() - start) < (1000 * 60 * 2) && !check.get())
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
service.refreshNow();
client.get(port, "localhost", "/api/v1/__health")
.as(BodyCodec.string())
.send(testContext.succeeding(upResponse -> testContext.verify(() ->
{
assertEquals(200, upResponse.statusCode());
testContext.completeNow();
})));
})));
assertTrue(testContext.awaitCompletion(125, TimeUnit.SECONDS));
if (testContext.failed())
{
throw testContext.causeOfFailure();
}
}
finally
{
service.stop();
server.close();
}
}
public HealthCheck healthCheckFor(BoundNode node, NettyOptions shared)
{
CQLSession session = new CQLSession(node.inetSocketAddress(), shared);
sessions.add(session);
return new HealthCheck(session);
}
private static class IntegrationTestModule extends AbstractModule
{
private final int nodeCount;
private final List<CQLSession> sessions;
private IntegrationTestModule(int count, List<CQLSession> sessions)
{
this.nodeCount = count;
this.sessions = sessions;
}
@Provides
@Singleton
BoundCluster cluster(Server server)
{
ClusterSpec cluster = ClusterSpec.builder()
.withNodes(nodeCount)
.build();
BoundCluster bc = server.register(cluster);
for (BoundNode n : bc.getNodes())
n.stop();
return bc;
}
@Provides
@Singleton
BoundNode node(BoundCluster bc)
{
return bc.node(0);
}
@Provides
@Singleton
Server server()
{
return Server.builder()
.withMultipleNodesPerIp(true)
.withAddressResolver(new NodePerPortResolver(new byte[]{ 127, 0, 0, 1 }, 49152))
.build();
}
@Provides
@Singleton
HealthCheck healthCheck(BoundNode node)
{
CQLSession session = new CQLSession(node.inetSocketAddress(), shared);
sessions.add(session);
HealthCheck check = new HealthCheck(session);
return check;
}
@Provides
@Singleton
public Configuration configuration() throws IOException
{
ServerSocket socket = new ServerSocket(0);
int randomPort = socket.getLocalPort();
socket.close();
return new Configuration.Builder()
.setCassandraHost("INVALID_FOR_TEST")
.setCassandraPort(0)
.setHost("127.0.0.1")
.setPort(randomPort)
.setHealthCheckFrequency(1000)
.setSslEnabled(false)
.build();
}
}
}