| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.db; |
| |
| import java.net.InetAddress; |
| import java.nio.ByteBuffer; |
| import java.util.UUID; |
| |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import org.apache.commons.lang3.StringUtils; |
| import org.junit.Test; |
| |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.cql3.CQLTester; |
| import org.apache.cassandra.cql3.UntypedResultSet; |
| import org.apache.cassandra.db.marshal.BytesType; |
| import org.apache.cassandra.db.marshal.Int32Type; |
| import org.apache.cassandra.db.marshal.TimeUUIDType; |
| import org.apache.cassandra.db.marshal.UTF8Type; |
| import org.apache.cassandra.dht.Range; |
| import org.apache.cassandra.dht.Token; |
| import org.apache.cassandra.io.sstable.SequenceBasedSSTableId; |
| import org.apache.cassandra.schema.SchemaConstants; |
| import org.apache.cassandra.utils.CassandraVersion; |
| import org.apache.cassandra.utils.FBUtilities; |
| import org.apache.cassandra.utils.TimeUUID; |
| |
| import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; |
| import static org.junit.Assert.assertEquals; |
| |
| public class SystemKeyspaceMigrator41Test extends CQLTester |
| { |
| @Test |
| public void testMigratePeers() throws Throwable |
| { |
| String legacyTab = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_PEERS); |
| String tab = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEERS_V2); |
| String insert = String.format("INSERT INTO %s (" |
| + "peer, " |
| + "data_center, " |
| + "host_id, " |
| + "preferred_ip, " |
| + "rack, " |
| + "release_version, " |
| + "rpc_address, " |
| + "schema_version, " |
| + "tokens) " |
| + " values ( ?, ?, ? , ? , ?, ?, ?, ?, ?)", |
| legacyTab); |
| UUID hostId = UUID.randomUUID(); |
| UUID schemaVersion = UUID.randomUUID(); |
| execute(insert, |
| InetAddress.getByName("127.0.0.1"), |
| "dcFoo", |
| hostId, |
| InetAddress.getByName("127.0.0.2"), |
| "rackFoo", "4.0", |
| InetAddress.getByName("127.0.0.3"), |
| schemaVersion, |
| ImmutableSet.of("foobar")); |
| SystemKeyspaceMigrator41.migratePeers(); |
| |
| int rowCount = 0; |
| for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", tab))) |
| { |
| rowCount++; |
| assertEquals(InetAddress.getByName("127.0.0.1"), row.getInetAddress("peer")); |
| assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("peer_port")); |
| assertEquals("dcFoo", row.getString("data_center")); |
| assertEquals(hostId, row.getUUID("host_id")); |
| assertEquals(InetAddress.getByName("127.0.0.2"), row.getInetAddress("preferred_ip")); |
| assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("preferred_port")); |
| assertEquals("rackFoo", row.getString("rack")); |
| assertEquals("4.0", row.getString("release_version")); |
| assertEquals(InetAddress.getByName("127.0.0.3"), row.getInetAddress("native_address")); |
| assertEquals(DatabaseDescriptor.getNativeTransportPort(), row.getInt("native_port")); |
| assertEquals(schemaVersion, row.getUUID("schema_version")); |
| assertEquals(ImmutableSet.of("foobar"), row.getSet("tokens", UTF8Type.instance)); |
| } |
| assertEquals(1, rowCount); |
| |
| //Test nulls/missing don't prevent the row from propagating |
| execute(String.format("TRUNCATE %s", legacyTab)); |
| execute(String.format("TRUNCATE %s", tab)); |
| |
| execute(String.format("INSERT INTO %s (peer) VALUES (?)", legacyTab), |
| InetAddress.getByName("127.0.0.1")); |
| SystemKeyspaceMigrator41.migratePeers(); |
| |
| rowCount = 0; |
| for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", tab))) |
| { |
| rowCount++; |
| } |
| assertEquals(1, rowCount); |
| } |
| |
| @Test |
| public void testMigratePeerEvents() throws Throwable |
| { |
| String legacyTab = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_PEER_EVENTS); |
| String tab = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEER_EVENTS_V2); |
| String insert = String.format("INSERT INTO %s (" |
| + "peer, " |
| + "hints_dropped) " |
| + " values ( ?, ? )", |
| legacyTab); |
| TimeUUID uuid = nextTimeUUID(); |
| execute(insert, |
| InetAddress.getByName("127.0.0.1"), |
| ImmutableMap.of(uuid, 42)); |
| SystemKeyspaceMigrator41.migratePeerEvents(); |
| |
| int rowCount = 0; |
| for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", tab))) |
| { |
| rowCount++; |
| assertEquals(InetAddress.getByName("127.0.0.1"), row.getInetAddress("peer")); |
| assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("peer_port")); |
| assertEquals(ImmutableMap.of(uuid, 42), row.getMap("hints_dropped", TimeUUIDType.instance, Int32Type.instance)); |
| } |
| assertEquals(1, rowCount); |
| |
| //Test nulls/missing don't prevent the row from propagating |
| execute(String.format("TRUNCATE %s", legacyTab)); |
| execute(String.format("TRUNCATE %s", tab)); |
| |
| execute(String.format("INSERT INTO %s (peer) VALUES (?)", legacyTab), |
| InetAddress.getByName("127.0.0.1")); |
| SystemKeyspaceMigrator41.migratePeerEvents(); |
| |
| rowCount = 0; |
| for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", tab))) |
| { |
| rowCount++; |
| } |
| assertEquals(1, rowCount); |
| } |
| |
| @Test |
| public void testMigrateTransferredRanges() throws Throwable |
| { |
| String legacyTab = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_TRANSFERRED_RANGES); |
| String tab = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.TRANSFERRED_RANGES_V2); |
| String insert = String.format("INSERT INTO %s (" |
| + "operation, " |
| + "peer, " |
| + "keyspace_name, " |
| + "ranges) " |
| + " values ( ?, ?, ?, ? )", |
| legacyTab); |
| execute(insert, |
| "foo", |
| InetAddress.getByName("127.0.0.1"), |
| "bar", |
| ImmutableSet.of(ByteBuffer.wrap(new byte[] { 42 }))); |
| SystemKeyspaceMigrator41.migrateTransferredRanges(); |
| |
| int rowCount = 0; |
| for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", tab))) |
| { |
| rowCount++; |
| assertEquals("foo", row.getString("operation")); |
| assertEquals(InetAddress.getByName("127.0.0.1"), row.getInetAddress("peer")); |
| assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("peer_port")); |
| assertEquals("bar", row.getString("keyspace_name")); |
| assertEquals(ImmutableSet.of(ByteBuffer.wrap(new byte[] { 42 })), row.getSet("ranges", BytesType.instance)); |
| } |
| assertEquals(1, rowCount); |
| |
| //Test nulls/missing don't prevent the row from propagating |
| execute(String.format("TRUNCATE %s", legacyTab)); |
| execute(String.format("TRUNCATE %s", tab)); |
| |
| execute(String.format("INSERT INTO %s (operation, peer, keyspace_name) VALUES (?, ?, ?)", legacyTab), |
| "foo", |
| InetAddress.getByName("127.0.0.1"), |
| "bar"); |
| SystemKeyspaceMigrator41.migrateTransferredRanges(); |
| |
| rowCount = 0; |
| for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", tab))) |
| { |
| rowCount++; |
| } |
| assertEquals(1, rowCount); |
| } |
| |
| @Test |
| public void testMigrateAvailableRanges() throws Throwable |
| { |
| String legacyTab = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_AVAILABLE_RANGES); |
| String tab = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.AVAILABLE_RANGES_V2); |
| Range<Token> testRange = new Range<>(DatabaseDescriptor.getPartitioner().getRandomToken(), DatabaseDescriptor.getPartitioner().getRandomToken()); |
| String insert = String.format("INSERT INTO %s (" |
| + "keyspace_name, " |
| + "ranges) " |
| + " values ( ?, ? )", |
| legacyTab); |
| execute(insert, |
| "foo", |
| ImmutableSet.of(SystemKeyspace.rangeToBytes(testRange))); |
| SystemKeyspaceMigrator41.migrateAvailableRanges(); |
| |
| int rowCount = 0; |
| for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", tab))) |
| { |
| rowCount++; |
| assertEquals("foo", row.getString("keyspace_name")); |
| assertEquals(ImmutableSet.of(testRange), SystemKeyspace.rawRangesToRangeSet(row.getSet("full_ranges", BytesType.instance), DatabaseDescriptor.getPartitioner())); |
| } |
| assertEquals(1, rowCount); |
| } |
| |
| @Test |
| public void testMigrateSSTableActivity() throws Throwable |
| { |
| FBUtilities.setPreviousReleaseVersionString(CassandraVersion.NULL_VERSION.toString()); |
| String legacyTab = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_SSTABLE_ACTIVITY); |
| String tab = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.SSTABLE_ACTIVITY_V2); |
| |
| String insert = String.format("INSERT INTO %s (%s) VALUES (%s)", |
| legacyTab, |
| StringUtils.join(new String[] {"keyspace_name", |
| "columnfamily_name", |
| "generation", |
| "rate_120m", |
| "rate_15m"}, ", "), |
| StringUtils.repeat("?", ", ", 5)); |
| |
| execute(insert, "ks", "tab", 5, 123.234d, 345.456d); |
| |
| ColumnFamilyStore cf = getColumnFamilyStore(SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.SSTABLE_ACTIVITY_V2); |
| cf.truncateBlocking(); |
| cf.clearUnsafe(); |
| SystemKeyspaceMigrator41.migrateSSTableActivity(); |
| |
| int rowCount = 0; |
| for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", tab))) |
| { |
| rowCount++; |
| assertEquals("ks", row.getString("keyspace_name")); |
| assertEquals("tab", row.getString("table_name")); |
| assertEquals(new SequenceBasedSSTableId(5).toString(), row.getString("id")); |
| assertEquals(123.234d, row.getDouble("rate_120m"), 0.001d); |
| assertEquals(345.456d, row.getDouble("rate_15m"), 0.001d); |
| } |
| assertEquals(1, rowCount); |
| } |
| } |