blob: 4cf32781100d7db6d7b867a81e6122800c3ec6ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.StartupChecksOptions;
import org.apache.cassandra.exceptions.StartupException;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.SchemaKeyspace;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.JsonUtils;
import org.apache.cassandra.utils.Pair;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.apache.cassandra.exceptions.StartupException.ERR_WRONG_DISK_STATE;
import static org.apache.cassandra.exceptions.StartupException.ERR_WRONG_MACHINE_STATE;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
public class DataResurrectionCheck implements StartupCheck
{
private static final Logger LOGGER = LoggerFactory.getLogger(DataResurrectionCheck.class);
public static final String HEARTBEAT_FILE_CONFIG_PROPERTY = "heartbeat_file";
public static final String EXCLUDED_KEYSPACES_CONFIG_PROPERTY = "excluded_keyspaces";
public static final String EXCLUDED_TABLES_CONFIG_PROPERTY = "excluded_tables";
public static final String DEFAULT_HEARTBEAT_FILE = "cassandra-heartbeat";
@JsonIgnoreProperties(ignoreUnknown = true)
public static class Heartbeat
{
@JsonProperty("last_heartbeat")
public final Instant lastHeartbeat;
/** needed for jackson serialization */
@SuppressWarnings("unused")
private Heartbeat() {
this.lastHeartbeat = null;
}
public Heartbeat(Instant lastHeartbeat)
{
this.lastHeartbeat = lastHeartbeat;
}
public void serializeToJsonFile(File outputFile) throws IOException
{
JsonUtils.serializeToJsonFile(this, outputFile);
}
public static Heartbeat deserializeFromJsonFile(File file) throws IOException
{
return JsonUtils.deserializeFromJsonFile(Heartbeat.class, file);
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Heartbeat manifest = (Heartbeat) o;
return Objects.equals(lastHeartbeat, manifest.lastHeartbeat);
}
@Override
public int hashCode()
{
return Objects.hash(lastHeartbeat);
}
}
@VisibleForTesting
static class TableGCPeriod
{
String table;
int gcPeriod;
TableGCPeriod(String table, int gcPeriod)
{
this.table = table;
this.gcPeriod = gcPeriod;
}
}
static File getHeartbeatFile(Map<String, Object> config)
{
String heartbeatFileConfigValue = (String) config.get(HEARTBEAT_FILE_CONFIG_PROPERTY);
File heartbeatFile;
if (heartbeatFileConfigValue != null)
{
heartbeatFile = new File(heartbeatFileConfigValue);
}
else
{
String[] dataFileLocations = DatabaseDescriptor.getLocalSystemKeyspacesDataFileLocations();
assert dataFileLocations.length != 0;
heartbeatFile = new File(dataFileLocations[0], DEFAULT_HEARTBEAT_FILE);
}
LOGGER.trace("Resolved heartbeat file for data resurrection check: " + heartbeatFile);
return heartbeatFile;
}
@Override
public StartupChecks.StartupCheckType getStartupCheckType()
{
return StartupChecks.StartupCheckType.check_data_resurrection;
}
@Override
public void execute(StartupChecksOptions options) throws StartupException
{
if (options.isDisabled(getStartupCheckType()))
return;
Map<String, Object> config = options.getConfig(StartupChecks.StartupCheckType.check_data_resurrection);
File heartbeatFile = getHeartbeatFile(config);
if (!heartbeatFile.exists())
{
LOGGER.debug("Heartbeat file {} not found! Skipping heartbeat startup check.", heartbeatFile.absolutePath());
return;
}
Heartbeat heartbeat;
try
{
heartbeat = Heartbeat.deserializeFromJsonFile(heartbeatFile);
}
catch (IOException ex)
{
throw new StartupException(ERR_WRONG_DISK_STATE, "Failed to deserialize heartbeat file " + heartbeatFile);
}
if (heartbeat.lastHeartbeat == null)
return;
long heartbeatMillis = heartbeat.lastHeartbeat.toEpochMilli();
List<Pair<String, String>> violations = new ArrayList<>();
Set<String> excludedKeyspaces = getExcludedKeyspaces(config);
Set<Pair<String, String>> excludedTables = getExcludedTables(config);
long currentTimeMillis = currentTimeMillis();
for (String keyspace : getKeyspaces())
{
if (excludedKeyspaces.contains(keyspace))
continue;
for (TableGCPeriod userTable : getTablesGcPeriods(keyspace))
{
if (excludedTables.contains(Pair.create(keyspace, userTable.table)))
continue;
long gcGraceMillis = ((long) userTable.gcPeriod) * 1000;
if (heartbeatMillis + gcGraceMillis < currentTimeMillis)
violations.add(Pair.create(keyspace, userTable.table));
}
}
if (!violations.isEmpty())
{
String invalidTables = violations.stream()
.map(p -> format("%s.%s", p.left, p.right))
.collect(joining(","));
String exceptionMessage = format("There are tables for which gc_grace_seconds is older " +
"than the lastly known time Cassandra node was up based " +
"on its heartbeat %s with timestamp %s. Cassandra node will not start " +
"as it would likely introduce data consistency " +
"issues (zombies etc). Please resolve these issues manually, " +
"then remove the heartbeat and start the node again. Invalid tables: %s",
heartbeatFile, heartbeat.lastHeartbeat, invalidTables);
throw new StartupException(ERR_WRONG_MACHINE_STATE, exceptionMessage);
}
}
@Override
public void postAction(StartupChecksOptions options)
{
// Schedule heartbeating after all checks have passed, not as part of the check,
// as it might happen that other checks after it might fail, but we would be heartbeating already.
if (options.isEnabled(StartupChecks.StartupCheckType.check_data_resurrection))
{
Map<String, Object> config = options.getConfig(StartupChecks.StartupCheckType.check_data_resurrection);
File heartbeatFile = DataResurrectionCheck.getHeartbeatFile(config);
ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(() ->
{
Heartbeat heartbeat = new Heartbeat(Instant.ofEpochMilli(Clock.Global.currentTimeMillis()));
try
{
heartbeatFile.parent().createDirectoriesIfNotExists();
DataResurrectionCheck.LOGGER.trace("writing heartbeat to file " + heartbeatFile);
heartbeat.serializeToJsonFile(heartbeatFile);
}
catch (IOException ex)
{
DataResurrectionCheck.LOGGER.error("Unable to serialize heartbeat to " + heartbeatFile, ex);
}
}, 0, CassandraRelevantProperties.CHECK_DATA_RESURRECTION_HEARTBEAT_PERIOD.getInt(), MILLISECONDS);
}
}
@VisibleForTesting
public Set<String> getExcludedKeyspaces(Map<String, Object> config)
{
String excludedKeyspacesConfigValue = (String) config.get(EXCLUDED_KEYSPACES_CONFIG_PROPERTY);
if (excludedKeyspacesConfigValue == null)
return Collections.emptySet();
else
return Arrays.stream(excludedKeyspacesConfigValue.trim().split(","))
.map(String::trim)
.collect(toSet());
}
@VisibleForTesting
public Set<Pair<String, String>> getExcludedTables(Map<String, Object> config)
{
String excludedKeyspacesConfigValue = (String) config.get(EXCLUDED_TABLES_CONFIG_PROPERTY);
if (excludedKeyspacesConfigValue == null)
return Collections.emptySet();
Set<Pair<String, String>> pairs = new HashSet<>();
for (String keyspaceTable : excludedKeyspacesConfigValue.trim().split(","))
{
String[] pair = keyspaceTable.trim().split("\\.");
if (pair.length != 2)
continue;
pairs.add(Pair.create(pair[0].trim(), pair[1].trim()));
}
return pairs;
}
@VisibleForTesting
List<String> getKeyspaces()
{
return SchemaKeyspace.fetchNonSystemKeyspaces()
.stream()
.map(keyspaceMetadata -> keyspaceMetadata.name)
.collect(toList());
}
@VisibleForTesting
List<TableGCPeriod> getTablesGcPeriods(String userKeyspace)
{
Optional<KeyspaceMetadata> keyspaceMetadata = SchemaKeyspace.fetchNonSystemKeyspaces().get(userKeyspace);
if (!keyspaceMetadata.isPresent())
return Collections.emptyList();
KeyspaceMetadata ksmd = keyspaceMetadata.get();
return ksmd.tables.stream()
.filter(tmd -> tmd.params.gcGraceSeconds > 0)
.map(tmd -> new TableGCPeriod(tmd.name, tmd.params.gcGraceSeconds)).collect(toList());
}
}