blob: a922b282d958e93d49906c16dcc1a3a3c23b30b5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.repair;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Tables;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
public final class SystemDistributedKeyspace
{
private SystemDistributedKeyspace()
{
}
private static final Logger logger = LoggerFactory.getLogger(SystemDistributedKeyspace.class);
public static final String NAME = "system_distributed";
/**
* Generation is used as a timestamp for automatic table creation on startup.
* If you make any changes to the tables below, make sure to increment the
* generation and document your change here.
*
* gen 0: original definition in 2.2
* gen 1: (pre-)add options column to parent_repair_history in 3.0, 3.11
* gen 2: (pre-)add coordinator_port and participants_v2 columns to repair_history in 3.0, 3.11, 4.0
*/
public static final long GENERATION = 2;
public static final String REPAIR_HISTORY = "repair_history";
public static final String PARENT_REPAIR_HISTORY = "parent_repair_history";
private static final CFMetaData RepairHistory =
compile(REPAIR_HISTORY,
"Repair history",
"CREATE TABLE %s ("
+ "keyspace_name text,"
+ "columnfamily_name text,"
+ "id timeuuid,"
+ "parent_id timeuuid,"
+ "range_begin text,"
+ "range_end text,"
+ "coordinator inet,"
+ "coordinator_port int,"
+ "participants set<inet>,"
+ "participants_v2 set<text>,"
+ "exception_message text,"
+ "exception_stacktrace text,"
+ "status text,"
+ "started_at timestamp,"
+ "finished_at timestamp,"
+ "PRIMARY KEY ((keyspace_name, columnfamily_name), id))");
private static final CFMetaData ParentRepairHistory =
compile(PARENT_REPAIR_HISTORY,
"Repair history",
"CREATE TABLE %s ("
+ "parent_id timeuuid,"
+ "keyspace_name text,"
+ "columnfamily_names set<text>,"
+ "started_at timestamp,"
+ "finished_at timestamp,"
+ "exception_message text,"
+ "exception_stacktrace text,"
+ "requested_ranges set<text>,"
+ "successful_ranges set<text>,"
+ "options map<text, text>,"
+ "PRIMARY KEY (parent_id))");
private static CFMetaData compile(String name, String description, String schema)
{
return CFMetaData.compile(String.format(schema, name), NAME)
.comment(description);
}
public static KeyspaceMetadata metadata()
{
return KeyspaceMetadata.create(NAME, KeyspaceParams.simple(3), Tables.of(RepairHistory, ParentRepairHistory));
}
public static void startParentRepair(UUID parent_id, String keyspaceName, String[] cfnames, Collection<Range<Token>> ranges)
{
String query = "INSERT INTO %s.%s (parent_id, keyspace_name, columnfamily_names, requested_ranges, started_at)"+
" VALUES (%s, '%s', { '%s' }, { '%s' }, toTimestamp(now()))";
String fmtQry = String.format(query, NAME, PARENT_REPAIR_HISTORY, parent_id.toString(), keyspaceName, Joiner.on("','").join(cfnames), Joiner.on("','").join(ranges));
processSilent(fmtQry);
}
public static void failParentRepair(UUID parent_id, Throwable t)
{
String query = "UPDATE %s.%s SET finished_at = toTimestamp(now()), exception_message=?, exception_stacktrace=? WHERE parent_id=%s";
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
t.printStackTrace(pw);
String fmtQuery = String.format(query, NAME, PARENT_REPAIR_HISTORY, parent_id.toString());
processSilent(fmtQuery, t.getMessage(), sw.toString());
}
public static void successfulParentRepair(UUID parent_id, Collection<Range<Token>> successfulRanges)
{
String query = "UPDATE %s.%s SET finished_at = toTimestamp(now()), successful_ranges = {'%s'} WHERE parent_id=%s";
String fmtQuery = String.format(query, NAME, PARENT_REPAIR_HISTORY, Joiner.on("','").join(successfulRanges), parent_id.toString());
processSilent(fmtQuery);
}
public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, Collection<Range<Token>> ranges, Iterable<InetAddress> endpoints)
{
String coordinator = FBUtilities.getBroadcastAddress().getHostAddress();
Set<String> participants = Sets.newHashSet(coordinator);
for (InetAddress endpoint : endpoints)
participants.add(endpoint.getHostAddress());
String query =
"INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, participants, status, started_at) " +
"VALUES ( '%s', '%s', %s, %s, '%s', '%s', '%s', { '%s' }, '%s', toTimestamp(now()))";
for (String cfname : cfnames)
{
for (Range<Token> range : ranges)
{
String fmtQry = String.format(query, NAME, REPAIR_HISTORY,
keyspaceName,
cfname,
id.toString(),
parent_id.toString(),
range.left.toString(),
range.right.toString(),
coordinator,
Joiner.on("', '").join(participants),
RepairState.STARTED.toString());
processSilent(fmtQry);
}
}
}
public static void failRepairs(UUID id, String keyspaceName, String[] cfnames, Throwable t)
{
for (String cfname : cfnames)
failedRepairJob(id, keyspaceName, cfname, t);
}
public static void successfulRepairJob(UUID id, String keyspaceName, String cfname)
{
String query = "UPDATE %s.%s SET status = '%s', finished_at = toTimestamp(now()) WHERE keyspace_name = '%s' AND columnfamily_name = '%s' AND id = %s";
String fmtQuery = String.format(query, NAME, REPAIR_HISTORY,
RepairState.SUCCESS.toString(),
keyspaceName,
cfname,
id.toString());
processSilent(fmtQuery);
}
public static void failedRepairJob(UUID id, String keyspaceName, String cfname, Throwable t)
{
String query = "UPDATE %s.%s SET status = '%s', finished_at = toTimestamp(now()), exception_message=?, exception_stacktrace=? WHERE keyspace_name = '%s' AND columnfamily_name = '%s' AND id = %s";
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
t.printStackTrace(pw);
String fmtQry = String.format(query, NAME, REPAIR_HISTORY,
RepairState.FAILED.toString(),
keyspaceName,
cfname,
id.toString());
processSilent(fmtQry, t.getMessage(), sw.toString());
}
private static void processSilent(String fmtQry, String... values)
{
try
{
List<ByteBuffer> valueList = new ArrayList<>();
for (String v : values)
{
valueList.add(ByteBufferUtil.bytes(v));
}
QueryProcessor.process(fmtQry, ConsistencyLevel.ONE, valueList);
}
catch (Throwable t)
{
logger.error("Error executing query "+fmtQry, t);
}
}
private enum RepairState
{
STARTED, SUCCESS, FAILED
}
}