| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.distributed.test; |
| |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.TimeoutException; |
| |
| import org.junit.Assert; |
| import org.junit.Test; |
| |
| import org.apache.cassandra.distributed.Cluster; |
| import org.apache.cassandra.distributed.api.Feature; |
| import org.apache.cassandra.distributed.api.IInstanceConfig; |
| import org.apache.cassandra.distributed.api.IInvokableInstance; |
| import org.apache.cassandra.distributed.api.LogResult; |
| import org.apache.cassandra.distributed.api.SimpleQueryResult; |
| import org.apache.cassandra.distributed.api.TokenSupplier; |
| import org.apache.cassandra.distributed.shared.Byteman; |
| import org.apache.cassandra.distributed.shared.NetworkTopology; |
| import org.apache.cassandra.distributed.shared.Shared; |
| |
| /** |
| * Replaces python dtest bootstrap_test.py::TestBootstrap::test_bootstrap_binary_disabled |
| */ |
| public class BootstrapBinaryDisabledTest extends TestBaseImpl |
| { |
| @Test |
| public void test() throws IOException, TimeoutException |
| { |
| Map<String, Object> config = new HashMap<>(); |
| config.put("authenticator", "org.apache.cassandra.auth.PasswordAuthenticator"); |
| config.put("authorizer", "org.apache.cassandra.auth.CassandraAuthorizer"); |
| config.put("role_manager", "org.apache.cassandra.auth.CassandraRoleManager"); |
| config.put("permissions_validity_in_ms", 0); |
| config.put("roles_validity_in_ms", 0); |
| |
| int originalNodeCount = 1; |
| int expandedNodeCount = originalNodeCount + 2; |
| Byteman byteman = Byteman.createFromScripts("test/resources/byteman/stream_failure.btm"); |
| try (Cluster cluster = init(Cluster.build(originalNodeCount) |
| .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) |
| .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) |
| .withConfig(c -> { |
| config.forEach(c::set); |
| c.with(Feature.GOSSIP, Feature.NETWORK, Feature.NATIVE_PROTOCOL); |
| }) |
| .withInstanceInitializer((cl, nodeNumber) -> { |
| switch (nodeNumber) { |
| case 1: |
| case 2: |
| byteman.install(cl); |
| break; |
| } |
| }) |
| .start())) |
| { |
| cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk text primary key)"); |
| populate(cluster.get(1)); |
| cluster.forEach(c -> c.flush(KEYSPACE)); |
| |
| bootstrap(cluster, config, false); |
| // Test write survey behaviour |
| bootstrap(cluster, config, true); |
| } |
| } |
| |
| private static void bootstrap(Cluster cluster, |
| Map<String, Object> config, |
| boolean isWriteSurvey) throws TimeoutException |
| { |
| IInstanceConfig nodeConfig = cluster.newInstanceConfig(); |
| nodeConfig.set("auto_bootstrap", true); |
| config.forEach(nodeConfig::set); |
| |
| //TODO can we make this more isolated? |
| System.setProperty("cassandra.ring_delay_ms", "5000"); |
| if (isWriteSurvey) |
| System.setProperty("cassandra.write_survey", "true"); |
| |
| RewriteEnabled.enable(); |
| cluster.bootstrap(nodeConfig).startup(); |
| IInvokableInstance node = cluster.get(cluster.size()); |
| assertLogHas(node, "Some data streaming failed"); |
| assertLogHas(node, isWriteSurvey ? |
| "Not starting client transports in write_survey mode as it's bootstrapping or auth is enabled" : |
| "Node is not yet bootstrapped completely"); |
| |
| node.nodetoolResult("join").asserts() |
| .failure() |
| .errorContains("Cannot join the ring until bootstrap completes"); |
| |
| RewriteEnabled.disable(); |
| node.nodetoolResult("bootstrap", "resume").asserts().success(); |
| if (isWriteSurvey) |
| assertLogHas(node, "Not starting client transports in write_survey mode as it's bootstrapping or auth is enabled"); |
| |
| if (isWriteSurvey) |
| { |
| node.nodetoolResult("join").asserts().success(); |
| assertLogHas(node, "Leaving write survey mode and joining ring at operator request"); |
| } |
| |
| node.logs().watchFor("Starting listening for CQL clients"); |
| assertBootstrapState(node, "COMPLETED"); |
| } |
| |
| private static void assertBootstrapState(IInvokableInstance node, String expected) |
| { |
| SimpleQueryResult qr = node.executeInternalWithResult("SELECT bootstrapped FROM system.local WHERE key='local'"); |
| Assert.assertTrue("No rows found", qr.hasNext()); |
| Assert.assertEquals(expected, qr.next().getString("bootstrapped")); |
| } |
| |
| private static void assertLogHas(IInvokableInstance node, String msg) |
| { |
| LogResult<List<String>> results = node.logs().grep(msg); |
| Assert.assertFalse("Unable to find '" + msg + "'", results.getResult().isEmpty()); |
| } |
| |
| private void populate(IInvokableInstance inst) |
| { |
| for (int i = 0; i < 10; i++) |
| inst.executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk) VALUES (?)", Integer.toString(i)); |
| } |
| |
| @Shared |
| public static final class RewriteEnabled |
| { |
| private static volatile boolean enabled = false; |
| |
| public static boolean isEnabled() |
| { |
| return enabled; |
| } |
| |
| public static void enable() |
| { |
| enabled = true; |
| } |
| |
| public static void disable() |
| { |
| enabled = false; |
| } |
| } |
| } |