blob: 13b314a224e4266f1870d89e0c39f32d58b2c628 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import org.junit.Assert;
import com.google.common.collect.ImmutableMap;
import org.junit.Test;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.LogAction;
import org.apache.cassandra.service.CassandraDaemon;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
import static org.apache.cassandra.distributed.shared.AssertUtils.row;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class JVMDTestTest extends TestBaseImpl
{
@Test
public void insertTimestampTest() throws IOException
{
try (Cluster cluster = init(Cluster.build(1).start()))
{
long now = FBUtilities.timestampMicros();
cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, i int)");
cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, i) VALUES (1,1)", ConsistencyLevel.ALL);
cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, i) VALUES (2,2) USING TIMESTAMP 1000", ConsistencyLevel.ALL);
Object [][] res = cluster.coordinator(1).execute("SELECT writetime(i) FROM "+KEYSPACE+".tbl WHERE id = 1", ConsistencyLevel.ALL);
assertEquals(1, res.length);
assertTrue("ts="+res[0][0], (long)res[0][0] >= now);
res = cluster.coordinator(1).execute("SELECT writetime(i) FROM "+KEYSPACE+".tbl WHERE id = 2", ConsistencyLevel.ALL);
assertEquals(1, res.length);
assertEquals(1000, (long) res[0][0]);
}
}
@Test
public void instanceLogs() throws IOException, TimeoutException
{
try (Cluster cluster = init(Cluster.build(2).withConfig(c -> c.with(Feature.values())).start()))
{
// debug logging is turned on so we will see debug logs
assertFalse(cluster.get(1).logs().grep("^DEBUG").getResult().isEmpty());
// make sure an exception is thrown in the cluster
LogAction logs = cluster.get(2).logs();
long mark = logs.mark(); // get the current position so watching doesn't see any previous exceptions
cluster.get(2).runOnInstance(() -> {
// pretend that an uncaught exception was thrown
LoggerFactory.getLogger(CassandraDaemon.class).error("Error", new RuntimeException("fail without fail"));
});
List<String> errors = logs.watchFor(mark, "^ERROR").getResult();
assertFalse(errors.isEmpty());
}
}
@Test
public void nonSharedConfigClassTest() throws IOException
{
Map<String,Object> commitLogCompression = ImmutableMap.of("class_name", "org.apache.cassandra.io.compress.LZ4Compressor",
"parameters", Collections.<String,Object>emptyMap());
Map<String,Object> encryptionOptions = ImmutableMap.of("cipher_suites", Collections.singletonList("FakeCipher"),
"optional", false,
"enabled", false);
try (Cluster cluster = Cluster.build(1)
.withConfig(c -> {
c.set("concurrent_reads", 321);
c.set("internode_compression", Config.InternodeCompression.dc);
c.set("client_encryption_options", encryptionOptions);
c.set("commitlog_compression", commitLogCompression);
}).start())
{
cluster.get(1).runOnInstance(() -> {
assertEquals(321, DatabaseDescriptor.getConcurrentReaders());
assertEquals(Config.InternodeCompression.dc, DatabaseDescriptor.internodeCompression());
assertEquals("org.apache.cassandra.io.compress.LZ4Compressor", DatabaseDescriptor.getCommitLogCompression().class_name);
assertTrue(DatabaseDescriptor.getCommitLogCompression().parameters.isEmpty());
});
}
}
@Test
public void modifySchemaWithStoppedNode() throws Throwable
{
try (Cluster cluster = init(Cluster.build().withNodes(2).withConfig(c -> c.with(Feature.GOSSIP).with(Feature.NETWORK)).start()))
{
assertFalse(cluster.get(1).isShutdown());
assertFalse(cluster.get(2).isShutdown());
cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE "+KEYSPACE+".tbl1 (id int primary key, i int)");
cluster.get(2).shutdown(true).get(1, TimeUnit.MINUTES);
assertFalse(cluster.get(1).isShutdown());
assertTrue(cluster.get(2).isShutdown());
cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE "+KEYSPACE+".tbl2 (id int primary key, i int)");
cluster.get(1).shutdown(true).get(1, TimeUnit.MINUTES);
assertTrue(cluster.get(1).isShutdown());
assertTrue(cluster.get(2).isShutdown());
// both nodes down, nothing to record a schema change so should get an exception
Throwable thrown = null;
try
{
cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " + KEYSPACE + ".tblX (id int primary key, i int)");
}
catch (Throwable tr)
{
thrown = tr;
}
assertNotNull("Expected to fail with all nodes down", thrown);
// Have to restart instance1 before instance2 as it is hard-coded as the seed in in-JVM configuration.
cluster.get(1).startup();
cluster.get(2).startup();
assertFalse(cluster.get(1).isShutdown());
assertFalse(cluster.get(2).isShutdown());
cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE "+KEYSPACE+".tbl3 (id int primary key, i int)");
assertRows(cluster.get(1).executeInternal("SELECT table_name FROM system_schema.tables WHERE keyspace_name = ?", KEYSPACE),
row("tbl1"), row("tbl2"), row("tbl3"));
assertRows(cluster.get(2).executeInternal("SELECT table_name FROM system_schema.tables WHERE keyspace_name = ?", KEYSPACE),
row("tbl1"), row("tbl2"), row("tbl3"));
// Finally test schema can be changed with the first node down
cluster.get(1).shutdown(true).get(1, TimeUnit.MINUTES);
cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE "+KEYSPACE+".tbl4 (id int primary key, i int)");
assertRows(cluster.get(2).executeInternal("SELECT table_name FROM system_schema.tables WHERE keyspace_name = ?", KEYSPACE),
row("tbl1"), row("tbl2"), row("tbl3"), row("tbl4"));
}
}
}