blob: 0bade4bee886c8dc1008ee0349d2fb3e1baff71c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.*;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.SchemaKeyspace;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.FBUtilities;
import static java.lang.String.format;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.db.SystemKeyspace.LOCAL;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class SystemKeyspaceTest
{
@BeforeClass
public static void prepSnapshotTracker()
{
DatabaseDescriptor.daemonInitialization();
CommitLog.instance.start();
}
@Test
public void testLocalTokens()
{
// Remove all existing tokens
Collection<Token> current = SystemKeyspace.loadTokens().asMap().get(FBUtilities.getLocalAddressAndPort());
if (current != null && !current.isEmpty())
SystemKeyspace.updateTokens(current);
List<Token> tokens = new ArrayList<Token>()
{{
for (int i = 0; i < 9; i++)
add(new BytesToken(ByteBufferUtil.bytes(String.format("token%d", i))));
}};
SystemKeyspace.updateTokens(tokens);
int count = 0;
for (Token tok : SystemKeyspace.getSavedTokens())
assert tokens.get(count++).equals(tok);
}
@Test
public void testNonLocalToken() throws UnknownHostException
{
BytesToken token = new BytesToken(ByteBufferUtil.bytes("token3"));
InetAddressAndPort address = InetAddressAndPort.getByName("127.0.0.2");
SystemKeyspace.updateTokens(address, Collections.<Token>singletonList(token));
assert SystemKeyspace.loadTokens().get(address).contains(token);
SystemKeyspace.removeEndpoint(address);
assert !SystemKeyspace.loadTokens().containsValue(token);
}
@Test
public void testLocalHostID()
{
UUID firstId = SystemKeyspace.getOrInitializeLocalHostId();
UUID secondId = SystemKeyspace.getOrInitializeLocalHostId();
assert firstId.equals(secondId) : String.format("%s != %s%n", firstId.toString(), secondId.toString());
}
private void assertDeleted()
{
assertTrue(getSystemSnapshotFiles(SchemaConstants.SYSTEM_KEYSPACE_NAME).isEmpty());
}
@Test
public void snapshotSystemKeyspaceIfUpgrading() throws IOException
{
// First, check that in the absence of any previous installed version, we don't create snapshots
for (ColumnFamilyStore cfs : Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStores())
cfs.clearUnsafe();
StorageService.instance.clearSnapshot(Collections.emptyMap(), null, SchemaConstants.SYSTEM_KEYSPACE_NAME);
SystemKeyspace.snapshotOnVersionChange();
assertDeleted();
// now setup system.local as if we're upgrading from a previous version
setupReleaseVersion(getOlderVersionString());
StorageService.instance.clearSnapshot(Collections.emptyMap(), null, SchemaConstants.SYSTEM_KEYSPACE_NAME);
assertDeleted();
// Compare versions again & verify that snapshots were created for all tables in the system ks
SystemKeyspace.snapshotOnVersionChange();
Set<String> snapshottedSystemTables = getSystemSnapshotFiles(SchemaConstants.SYSTEM_KEYSPACE_NAME);
SystemKeyspace.metadata().tables.forEach(t -> assertTrue(snapshottedSystemTables.contains(t.name)));
Set<String> snapshottedSchemaTables = getSystemSnapshotFiles(SchemaConstants.SCHEMA_KEYSPACE_NAME);
SchemaKeyspace.metadata().tables.forEach(t -> assertTrue(snapshottedSchemaTables.contains(t.name)));
// clear out the snapshots & set the previous recorded version equal to the latest, we shouldn't
// see any new snapshots created this time.
StorageService.instance.clearSnapshot(Collections.emptyMap(), null, SchemaConstants.SYSTEM_KEYSPACE_NAME);
setupReleaseVersion(FBUtilities.getReleaseVersionString());
SystemKeyspace.snapshotOnVersionChange();
// snapshotOnVersionChange for upgrade case will open a SSTR when the CFS is flushed.
// 10 files expected.
assertDeleted();
StorageService.instance.clearSnapshot(Collections.emptyMap(), null, SchemaConstants.SYSTEM_KEYSPACE_NAME);
}
@Test
public void testPersistLocalMetadata()
{
SystemKeyspace.persistLocalMetadata();
UntypedResultSet result = executeInternal(format("SELECT * FROM system.%s WHERE key='%s'", LOCAL, LOCAL));
assertNotNull(result);
UntypedResultSet.Row row = result.one();
assertEquals(DatabaseDescriptor.getClusterName(), row.getString("cluster_name"));
assertEquals(FBUtilities.getReleaseVersionString(), row.getString("release_version"));
assertEquals(QueryProcessor.CQL_VERSION.toString(), row.getString("cql_version"));
assertEquals(String.valueOf(ProtocolVersion.CURRENT.asInt()), row.getString("native_protocol_version"));
assertEquals(DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(), row.getString("data_center"));
assertEquals(DatabaseDescriptor.getEndpointSnitch().getLocalRack(), row.getString("rack"));
assertEquals(DatabaseDescriptor.getPartitioner().getClass().getName(), row.getString("partitioner"));
assertEquals(FBUtilities.getJustBroadcastNativeAddress(), row.getInetAddress("rpc_address"));
assertEquals(DatabaseDescriptor.getNativeTransportPort(), row.getInt("rpc_port"));
assertEquals(FBUtilities.getJustBroadcastAddress(), row.getInetAddress("broadcast_address"));
assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("broadcast_port"));
assertEquals(FBUtilities.getJustLocalAddress(), row.getInetAddress("listen_address"));
assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("listen_port"));
}
private String getOlderVersionString()
{
String version = FBUtilities.getReleaseVersionString();
CassandraVersion semver = new CassandraVersion(version.contains("-") ? version.substring(0, version.indexOf('-'))
: version);
return (String.format("%s.%s.%s", semver.major - 1, semver.minor, semver.patch));
}
private Set<String> getSystemSnapshotFiles(String keyspace)
{
Set<String> snapshottedTableNames = new HashSet<>();
for (ColumnFamilyStore cfs : Keyspace.open(keyspace).getColumnFamilyStores())
{
if (!cfs.listSnapshots().isEmpty())
snapshottedTableNames.add(cfs.getTableName());
}
return snapshottedTableNames;
}
private void setupReleaseVersion(String version)
{
// besides the release_version, we also need to insert the cluster_name or the check
// in SystemKeyspace.checkHealth were we verify it matches DatabaseDescriptor will fail
QueryProcessor.executeInternal(String.format("INSERT INTO system.local(key, release_version, cluster_name) " +
"VALUES ('local', '%s', '%s')",
version,
DatabaseDescriptor.getClusterName()));
String r = readLocalVersion();
assertEquals(String.format("Expected %s, got %s", version, r), version, r);
}
private String readLocalVersion()
{
UntypedResultSet rs = QueryProcessor.executeInternal("SELECT release_version FROM system.local WHERE key='local'");
return rs.isEmpty() || !rs.one().has("release_version") ? null : rs.one().getString("release_version");
}
}