| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.distributed.test; |
| |
| import java.util.EnumMap; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.junit.Test; |
| |
| import org.apache.cassandra.config.CassandraRelevantProperties; |
| import org.apache.cassandra.config.StartupChecksOptions; |
| import org.apache.cassandra.distributed.Cluster; |
| import org.apache.cassandra.distributed.api.IInvokableInstance; |
| import org.apache.cassandra.distributed.api.IIsolatedExecutor; |
| import org.apache.cassandra.exceptions.StartupException; |
| import org.apache.cassandra.io.util.File; |
| import org.apache.cassandra.service.DataResurrectionCheck; |
| import org.apache.cassandra.service.DataResurrectionCheck.Heartbeat; |
| import org.apache.cassandra.service.StartupChecks.StartupCheckType; |
| import org.apache.cassandra.utils.Clock.Global; |
| |
| import static java.lang.String.format; |
| import static java.util.concurrent.TimeUnit.MINUTES; |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| import static org.apache.cassandra.config.StartupChecksOptions.ENABLED_PROPERTY; |
| import static org.apache.cassandra.distributed.Cluster.build; |
| import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; |
| import static org.apache.cassandra.service.DataResurrectionCheck.DEFAULT_HEARTBEAT_FILE; |
| import static org.apache.cassandra.service.DataResurrectionCheck.EXCLUDED_KEYSPACES_CONFIG_PROPERTY; |
| import static org.apache.cassandra.service.DataResurrectionCheck.EXCLUDED_TABLES_CONFIG_PROPERTY; |
| import static org.apache.cassandra.service.StartupChecks.StartupCheckType.check_data_resurrection; |
| import static org.awaitility.Awaitility.await; |
| import static org.hamcrest.Matchers.containsString; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| |
| public class DataResurrectionCheckTest extends TestBaseImpl |
| { |
| @Test |
| public void testDataResurrectionCheck() throws Exception |
| { |
| try |
| { |
| // set it to 1 hour so check will be not updated after it is written, for test purposes |
| System.setProperty(CassandraRelevantProperties.CHECK_DATA_RESURRECTION_HEARTBEAT_PERIOD.getKey(), "3600000"); |
| |
| // start the node with the check enabled, it will just pass fine as there are not any user tables yet |
| // and system tables are young enough |
| try (Cluster cluster = build().withNodes(1) |
| .withDataDirCount(3) // we will expect heartbeat to be in the first data dir |
| .withConfig(config -> config.with(NATIVE_PROTOCOL) |
| .set("startup_checks", |
| getStartupChecksConfig(ENABLED_PROPERTY, "true"))) |
| .start()) |
| { |
| IInvokableInstance instance = cluster.get(1); |
| |
| checkHeartbeat(instance); |
| |
| for (String ks : new String[]{ "ks1", "ks2", "ks3" }) |
| { |
| cluster.schemaChange("CREATE KEYSPACE " + ks + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};"); |
| cluster.schemaChange(format("CREATE TABLE %s.tb1 (pk text PRIMARY KEY) WITH gc_grace_seconds = 10", ks)); |
| cluster.schemaChange(format("CREATE TABLE %s.tb2 (pk text PRIMARY KEY)", ks)); |
| } |
| |
| AtomicReference<Throwable> throwable = new AtomicReference<>(); |
| // periodically execute check on a running instance and wait until the exception is thrown on all keyspaces |
| // wait for all violations by Awaitility as due to nature how tables were created, |
| // they will not expire on their gc_grace_period exactly at the same time |
| await().timeout(1, MINUTES) |
| .pollInterval(5, SECONDS) |
| .until(() -> { |
| Throwable t = executeChecksOnInstance(instance); |
| if (t == null) |
| return false; |
| String message = t.getMessage(); |
| if (!message.contains("ks1") || !message.contains("ks2") || !message.contains("ks3")) |
| { |
| return false; |
| } |
| throwable.set(t); |
| return true; |
| }); |
| |
| assertThat(throwable.get().getMessage(), containsString("Invalid tables")); |
| // returned tables in output are not in any particular order |
| // it is how they are returned from system tables |
| assertThat(throwable.get().getMessage(), containsString("ks1.tb1")); |
| assertThat(throwable.get().getMessage(), containsString("ks2.tb1")); |
| assertThat(throwable.get().getMessage(), containsString("ks3.tb1")); |
| |
| // exclude failing keyspaces which already expired on their gc_grace_seconds, so we will pass the check |
| assertNull(executeChecksOnInstance(instance, EXCLUDED_KEYSPACES_CONFIG_PROPERTY, "ks1,ks2,ks3")); |
| |
| // exclude failing tables which already expired on their gc_grace_seconds, so we will pass the check |
| assertNull(executeChecksOnInstance(instance, EXCLUDED_TABLES_CONFIG_PROPERTY, "ks1.tb1,ks2.tb1,ks3.tb1")); |
| |
| // exclude failing tables, but not all of them, |
| // so check detects only one table violates the check |
| Throwable t = executeChecksOnInstance(instance, EXCLUDED_TABLES_CONFIG_PROPERTY, "ks1.tb1,ks2.tb1"); |
| |
| assertNotNull(t); |
| assertThat(t.getMessage(), containsString("Invalid tables: ks3.tb1")); |
| |
| // shadow table exclusion with keyspace exclusion, we have not excluded ks3.tb1, but we excluded whole ks3 |
| assertNull(executeChecksOnInstance(instance, |
| EXCLUDED_TABLES_CONFIG_PROPERTY, "ks1.tb1,ks2.tb1", |
| EXCLUDED_KEYSPACES_CONFIG_PROPERTY, "ks3")); |
| } |
| } |
| finally |
| { |
| System.clearProperty(CassandraRelevantProperties.CHECK_DATA_RESURRECTION_HEARTBEAT_PERIOD.getKey()); |
| } |
| } |
| |
| private Throwable executeChecksOnInstance(IInvokableInstance instance, final String... config) |
| { |
| assert config.length % 2 == 0; |
| return instance.callsOnInstance((IIsolatedExecutor.SerializableCallable<Throwable>) () -> |
| { |
| try |
| { |
| DataResurrectionCheck check = new DataResurrectionCheck(); |
| StartupChecksOptions startupChecksOptions = new StartupChecksOptions(); |
| startupChecksOptions.enable(check_data_resurrection); |
| |
| for (int i = 0; i < config.length - 1; i = i + 2) |
| startupChecksOptions.set(check_data_resurrection, config[i], config[i + 1]); |
| |
| check.execute(startupChecksOptions); |
| return null; |
| } |
| catch (StartupException e) |
| { |
| return e; |
| } |
| }).call(); |
| } |
| |
| private Map<StartupCheckType, Map<String, Object>> getStartupChecksConfig(String... configs) |
| { |
| return new EnumMap<StartupCheckType, Map<String, Object>>(StartupCheckType.class) |
| {{ |
| put(check_data_resurrection, |
| new HashMap<String, Object>() |
| {{ |
| for (int i = 0; i < configs.length - 1; i = i + 2) |
| put(configs[i], configs[i + 1]); |
| }}); |
| }}; |
| } |
| |
| private void checkHeartbeat(IInvokableInstance instance) throws Exception |
| { |
| File heartbeatFile = new File(((String[]) instance.config().get("data_file_directories"))[0], |
| DEFAULT_HEARTBEAT_FILE); |
| assertTrue(heartbeatFile.exists()); |
| Heartbeat heartbeat = Heartbeat.deserializeFromJsonFile(heartbeatFile); |
| assertNotNull(heartbeat.lastHeartbeat); |
| assertTrue(heartbeat.lastHeartbeat.toEpochMilli() < Global.currentTimeMillis()); |
| } |
| } |