blob: 567f67eb3b504e3bc8180fe73729a3d240fe9998 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import com.datastax.driver.core.NettyOptions;
import com.datastax.oss.simulacron.common.cluster.ClusterSpec;
import com.datastax.oss.simulacron.server.BoundCluster;
import com.datastax.oss.simulacron.server.BoundNode;
import com.datastax.oss.simulacron.server.NodePerPortResolver;
import com.datastax.oss.simulacron.server.Server;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.codec.BodyCodec;
import io.vertx.junit5.VertxTestContext;
import org.apache.cassandra.sidecar.routes.HealthCheck;
import org.apache.cassandra.sidecar.routes.HealthService;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Longer run and more intensive tests for the HealthService and HealthCheck
*/
@DisplayName("Health Service Integration Tests")
public class HealthServiceIntegrationTest
{
private static final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("HealthServiceTest-%d")
.build();
private static final HashedWheelTimer sharedHWT = new HashedWheelTimer(threadFactory);
private static final EventLoopGroup sharedEventLoopGroup = new NioEventLoopGroup(0, threadFactory);
private static final NettyOptions shared = new NettyOptions()
{
public EventLoopGroup eventLoopGroup(ThreadFactory threadFactory)
{
return sharedEventLoopGroup;
}
public void onClusterClose(EventLoopGroup eventLoopGroup)
{
}
public Timer timer(ThreadFactory threadFactory)
{
return sharedHWT;
}
public void onClusterClose(Timer timer)
{
}
};
private Vertx vertx;
private Router router;
private HttpServer httpServer;
private int port;
private List<CQLSession> sessions = new LinkedList<>();
@BeforeEach
void setUp() throws IOException
{
vertx = Vertx.vertx();
router = Router.router(vertx);
ServerSocket socket = new ServerSocket(0);
port = socket.getLocalPort();
httpServer = vertx.createHttpServer(new HttpServerOptions()
.setPort(port)
.setLogActivity(true));
}
@AfterEach
void tearDown()
{
vertx.close();
}
@AfterEach
public void closeClusters()
{
for (CQLSession session : sessions)
session.close();
sessions.clear();
}
@DisplayName("100 node cluster stopping, then starting")
@Test
public void testDownHost() throws InterruptedException
{
int nodeCount = 100;
try (Server server = Server.builder()
.withMultipleNodesPerIp(true)
.withAddressResolver(new NodePerPortResolver(new byte[]{ 127, 0, 0, 1 }, 49152))
.build())
{
ClusterSpec cluster = ClusterSpec.builder()
.withNodes(nodeCount)
.build();
BoundCluster bCluster = server.register(cluster);
Set<BoundNode> downNodes = new HashSet<>();
Map<BoundNode, HealthCheck> checks = new HashMap<>();
// Create a HealthCheck per node
for (BoundNode node : bCluster.getNodes())
checks.put(node, healthCheckFor(node, shared));
// verify all nodes marked as up
for (BoundNode node : bCluster.getNodes())
assertTrue(checks.get(node).get());
// shut down nodes one at a time, and verify we get correct response on all HealthChecks every iteration
for (int i = 0; downNodes.size() < nodeCount; i++)
{
for (BoundNode node : bCluster.getNodes())
assertEquals(checks.get(node).get(), !downNodes.contains(node));
bCluster.node(i).stop();
downNodes.add(bCluster.node(i));
}
// all hosts should be down
for (BoundNode node : bCluster.getNodes())
assertFalse(checks.get(node).get());
for (int i = 0; downNodes.size() > 0; i++)
{
bCluster.node(i).start();
downNodes.remove(bCluster.node(i));
}
// verify all nodes marked as up
long start = System.currentTimeMillis();
for (BoundNode node : bCluster.getNodes())
{
while ((System.currentTimeMillis() - start) < 10000 && !checks.get(node).get())
Thread.sleep(100);
assertTrue(checks.get(node).get());
}
}
}
@DisplayName("Down on startup, then comes up")
@Test
public void testDownHostTurnsOn() throws Throwable
{
VertxTestContext testContext = new VertxTestContext();
try (Server server = Server.builder()
.withMultipleNodesPerIp(true)
.withAddressResolver(new NodePerPortResolver(new byte[]{ 127, 0, 0, 1 }, 49152))
.build())
{
ClusterSpec cluster = ClusterSpec.builder()
.withNodes(1)
.build();
BoundCluster bCluster = server.register(cluster);
BoundNode node = bCluster.node(0);
node.stop();
CQLSession session = new CQLSession(node.inetSocketAddress(), shared);
sessions.add(session);
HealthCheck check = new HealthCheck(session);
HealthService service = new HealthService(new Configuration.Builder()
.setHealthCheckFrequency(1000)
.build(),
check, session);
service.start();
try
{
router.route("/health").handler(service::handleHealth);
httpServer.requestHandler(router);
httpServer.listen();
WebClient client = WebClient.create(vertx);
long start = System.currentTimeMillis();
client.get(port, "localhost", "/health")
.as(BodyCodec.string())
.send(testContext.succeeding(response -> testContext.verify(() ->
{
assertEquals(503, response.statusCode());
node.start();
while ((System.currentTimeMillis() - start) < (1000 * 60 * 2) && !check.get())
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
service.refreshNow();
client.get(port, "localhost", "/health")
.as(BodyCodec.string())
.send(testContext.succeeding(upResponse -> testContext.verify(() ->
{
assertEquals(200, upResponse.statusCode());
testContext.completeNow();
})));
})));
assertTrue(testContext.awaitCompletion(125, TimeUnit.SECONDS));
if (testContext.failed())
{
throw testContext.causeOfFailure();
}
}
finally
{
service.stop();
}
}
}
public HealthCheck healthCheckFor(BoundNode node, NettyOptions shared)
{
CQLSession session = new CQLSession(node.inetSocketAddress(), shared);
sessions.add(session);
return new HealthCheck(session);
}
}