blob: e0a58baf637d1525a5f9ff2c8a75c3a979772cb7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.UUIDType;
/**
* Migrate 3.0 versions of some tables to 4.0. In this case it's just extra columns and some keys
* that are changed.
*
* Can't just add the additional columns because they are primary key columns and C* doesn't support changing
* key columns even if it's just clustering columns.
*/
public class SystemKeyspaceMigrator40
{
static final String legacyPeersName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_PEERS);
static final String peersName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEERS_V2);
static final String legacyPeerEventsName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_PEER_EVENTS);
static final String peerEventsName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEER_EVENTS_V2);
static final String legacyTransferredRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_TRANSFERRED_RANGES);
static final String transferredRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.TRANSFERRED_RANGES_V2);
static final String legacyAvailableRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_AVAILABLE_RANGES);
static final String availableRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.AVAILABLE_RANGES_V2);
private static final Logger logger = LoggerFactory.getLogger(SystemKeyspaceMigrator40.class);
private SystemKeyspaceMigrator40() {}
public static void migrate()
{
migratePeers();
migratePeerEvents();
migrateTransferredRanges();
migrateAvailableRanges();
}
private static void migratePeers()
{
ColumnFamilyStore newPeers = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.PEERS_V2);
if (!newPeers.isEmpty())
return;
logger.info("{} table was empty, migrating legacy {}, if this fails you should fix the issue and then truncate {} to have it try again.",
peersName, legacyPeersName, peersName);
String query = String.format("SELECT * FROM %s",
legacyPeersName);
String insert = String.format("INSERT INTO %s ( "
+ "peer, "
+ "peer_port, "
+ "data_center, "
+ "host_id, "
+ "preferred_ip, "
+ "preferred_port, "
+ "rack, "
+ "release_version, "
+ "native_address, "
+ "native_port, "
+ "schema_version, "
+ "tokens) "
+ " values ( ?, ?, ? , ? , ?, ?, ?, ?, ?, ?, ?, ?)",
peersName);
UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, 1000);
int transferred = 0;
logger.info("Migrating rows from legacy {} to {}", legacyPeersName, peersName);
for (UntypedResultSet.Row row : rows)
{
logger.debug("Transferring row {}", transferred);
QueryProcessor.executeInternal(insert,
row.has("peer") ? row.getInetAddress("peer") : null,
DatabaseDescriptor.getStoragePort(),
row.has("data_center") ? row.getString("data_center") : null,
row.has("host_id") ? row.getUUID("host_id") : null,
row.has("preferred_ip") ? row.getInetAddress("preferred_ip") : null,
DatabaseDescriptor.getStoragePort(),
row.has("rack") ? row.getString("rack") : null,
row.has("release_version") ? row.getString("release_version") : null,
row.has("rpc_address") ? row.getInetAddress("rpc_address") : null,
DatabaseDescriptor.getNativeTransportPort(),
row.has("schema_version") ? row.getUUID("schema_version") : null,
row.has("tokens") ? row.getSet("tokens", UTF8Type.instance) : null);
transferred++;
}
logger.info("Migrated {} rows from legacy {} to {}", transferred, legacyPeersName, peersName);
}
private static void migratePeerEvents()
{
ColumnFamilyStore newPeerEvents = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.PEER_EVENTS_V2);
if (!newPeerEvents.isEmpty())
return;
logger.info("{} table was empty, migrating legacy {} to {}", peerEventsName, legacyPeerEventsName, peerEventsName);
String query = String.format("SELECT * FROM %s",
legacyPeerEventsName);
String insert = String.format("INSERT INTO %s ( "
+ "peer, "
+ "peer_port, "
+ "hints_dropped) "
+ " values ( ?, ?, ? )",
peerEventsName);
UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, 1000);
int transferred = 0;
for (UntypedResultSet.Row row : rows)
{
logger.debug("Transferring row {}", transferred);
QueryProcessor.executeInternal(insert,
row.has("peer") ? row.getInetAddress("peer") : null,
DatabaseDescriptor.getStoragePort(),
row.has("hints_dropped") ? row.getMap("hints_dropped", UUIDType.instance, Int32Type.instance) : null);
transferred++;
}
logger.info("Migrated {} rows from legacy {} to {}", transferred, legacyPeerEventsName, peerEventsName);
}
static void migrateTransferredRanges()
{
ColumnFamilyStore newTransferredRanges = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.TRANSFERRED_RANGES_V2);
if (!newTransferredRanges.isEmpty())
return;
logger.info("{} table was empty, migrating legacy {} to {}", transferredRangesName, legacyTransferredRangesName, transferredRangesName);
String query = String.format("SELECT * FROM %s",
legacyTransferredRangesName);
String insert = String.format("INSERT INTO %s ("
+ "operation, "
+ "peer, "
+ "peer_port, "
+ "keyspace_name, "
+ "ranges) "
+ " values ( ?, ?, ? , ?, ?)",
transferredRangesName);
UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, 1000);
int transferred = 0;
for (UntypedResultSet.Row row : rows)
{
logger.debug("Transferring row {}", transferred);
QueryProcessor.executeInternal(insert,
row.has("operation") ? row.getString("operation") : null,
row.has("peer") ? row.getInetAddress("peer") : null,
DatabaseDescriptor.getStoragePort(),
row.has("keyspace_name") ? row.getString("keyspace_name") : null,
row.has("ranges") ? row.getSet("ranges", BytesType.instance) : null);
transferred++;
}
logger.info("Migrated {} rows from legacy {} to {}", transferred, legacyTransferredRangesName, transferredRangesName);
}
static void migrateAvailableRanges()
{
ColumnFamilyStore newAvailableRanges = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.AVAILABLE_RANGES_V2);
if (!newAvailableRanges.isEmpty())
return;
logger.info("{} table was empty, migrating legacy {} to {}", availableRangesName, legacyAvailableRangesName, availableRangesName);
String query = String.format("SELECT * FROM %s",
legacyAvailableRangesName);
String insert = String.format("INSERT INTO %s ("
+ "keyspace_name, "
+ "full_ranges, "
+ "transient_ranges) "
+ " values ( ?, ?, ? )",
availableRangesName);
UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, 1000);
int transferred = 0;
for (UntypedResultSet.Row row : rows)
{
logger.debug("Transferring row {}", transferred);
String keyspace = row.getString("keyspace_name");
Set<ByteBuffer> ranges = Optional.ofNullable(row.getSet("ranges", BytesType.instance)).orElse(Collections.emptySet());
QueryProcessor.executeInternal(insert,
keyspace,
ranges,
Collections.emptySet());
transferred++;
}
logger.info("Migrated {} rows from legacy {} to {}", transferred, legacyAvailableRangesName, availableRangesName);
}
}