blob: 7d79036ba103e712f8a194b2e2e20fb83b766d36 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable;
import java.io.File;
import java.io.FilenameFilter;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.OutputHandler;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
public class CQLSSTableWriterTest
{
@BeforeClass
public static void setup() throws Exception
{
DatabaseDescriptor.setDaemonInitialized();
SchemaLoader.cleanupAndLeaveDirs();
Keyspace.setInitialized();
StorageService.instance.initServer();
}
@AfterClass
public static void tearDown() throws Exception
{
Config.setClientMode(false);
}
@Test
public void testUnsortedWriter() throws Exception
{
try (AutoCloseable switcher = Util.switchPartitioner(ByteOrderedPartitioner.instance))
{
String KS = "cql_keyspace";
String TABLE = "table1";
File tempdir = Files.createTempDir();
File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
assert dataDir.mkdirs();
String schema = "CREATE TABLE cql_keyspace.table1 ("
+ " k int PRIMARY KEY,"
+ " v1 text,"
+ " v2 int"
+ ")";
String insert = "INSERT INTO cql_keyspace.table1 (k, v1, v2) VALUES (?, ?, ?)";
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
.forTable(schema)
.using(insert).build();
writer.addRow(0, "test1", 24);
writer.addRow(1, "test2", 44);
writer.addRow(2, "test3", 42);
writer.addRow(ImmutableMap.<String, Object>of("k", 3, "v2", 12));
writer.close();
loadSSTables(dataDir, KS);
UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table1;");
assertEquals(4, rs.size());
Iterator<UntypedResultSet.Row> iter = rs.iterator();
UntypedResultSet.Row row;
row = iter.next();
assertEquals(0, row.getInt("k"));
assertEquals("test1", row.getString("v1"));
assertEquals(24, row.getInt("v2"));
row = iter.next();
assertEquals(1, row.getInt("k"));
assertEquals("test2", row.getString("v1"));
//assertFalse(row.has("v2"));
assertEquals(44, row.getInt("v2"));
row = iter.next();
assertEquals(2, row.getInt("k"));
assertEquals("test3", row.getString("v1"));
assertEquals(42, row.getInt("v2"));
row = iter.next();
assertEquals(3, row.getInt("k"));
assertEquals(null, row.getBytes("v1")); // Using getBytes because we know it won't NPE
assertEquals(12, row.getInt("v2"));
}
}
@Test
public void testForbidCounterUpdates() throws Exception
{
String KS = "cql_keyspace";
String TABLE = "counter1";
File tempdir = Files.createTempDir();
File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
assert dataDir.mkdirs();
String schema = "CREATE TABLE cql_keyspace.counter1 (" +
" my_id int, " +
" my_counter counter, " +
" PRIMARY KEY (my_id)" +
")";
String insert = String.format("UPDATE cql_keyspace.counter1 SET my_counter = my_counter - ? WHERE my_id = ?");
try
{
CQLSSTableWriter.builder().inDirectory(dataDir)
.forTable(schema)
.withPartitioner(Murmur3Partitioner.instance)
.using(insert).build();
fail("Counter update statements should not be supported");
}
catch (IllegalArgumentException e)
{
assertEquals(e.getMessage(), "Counter update statements are not supported");
}
}
@Test
public void testSyncWithinPartition() throws Exception
{
// Check that the write respect the buffer size even if we only insert rows withing the same partition (#7360)
// To do that simply, we use a writer with a buffer of 1MB, and write 2 rows in the same partition with a value
// > 1MB and validate that this created more than 1 sstable.
String KS = "ks";
String TABLE = "test";
File tempdir = Files.createTempDir();
File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
assert dataDir.mkdirs();
String schema = "CREATE TABLE ks.test ("
+ " k int PRIMARY KEY,"
+ " v blob"
+ ")";
String insert = "INSERT INTO ks.test (k, v) VALUES (?, ?)";
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
.forTable(schema)
.using(insert)
.withBufferSizeInMB(1)
.build();
ByteBuffer val = ByteBuffer.allocate(1024 * 1050);
writer.addRow(0, val);
writer.addRow(1, val);
writer.close();
FilenameFilter filterDataFiles = new FilenameFilter()
{
public boolean accept(File dir, String name)
{
return name.endsWith("-Data.db");
}
};
assert dataDir.list(filterDataFiles).length > 1 : Arrays.toString(dataDir.list(filterDataFiles));
}
@Test
public void testSyncNoEmptyRows() throws Exception
{
// Check that the write does not throw an empty partition error (#9071)
File tempdir = Files.createTempDir();
String schema = "CREATE TABLE ks.test2 ("
+ " k UUID,"
+ " c int,"
+ " PRIMARY KEY (k)"
+ ")";
String insert = "INSERT INTO ks.test2 (k, c) VALUES (?, ?)";
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(tempdir)
.forTable(schema)
.using(insert)
.withBufferSizeInMB(1)
.build();
for (int i = 0 ; i < 50000 ; i++) {
writer.addRow(UUID.randomUUID(), 0);
}
writer.close();
}
@Test
public void testUpdateStatement() throws Exception
{
final String KS = "cql_keyspace6";
final String TABLE = "table6";
final String schema = "CREATE TABLE " + KS + "." + TABLE + " ("
+ " k int,"
+ " c1 int,"
+ " c2 int,"
+ " v text,"
+ " PRIMARY KEY (k, c1, c2)"
+ ")";
File tempdir = Files.createTempDir();
File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
assert dataDir.mkdirs();
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
.forTable(schema)
.using("UPDATE " + KS + "." + TABLE + " SET v = ? " +
"WHERE k = ? AND c1 = ? AND c2 = ?")
.build();
writer.addRow("a", 1, 2, 3);
writer.addRow("b", 4, 5, 6);
writer.addRow(null, 7, 8, 9);
writer.close();
loadSSTables(dataDir, KS);
UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE);
assertEquals(2, resultSet.size());
Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
UntypedResultSet.Row r1 = iter.next();
assertEquals(1, r1.getInt("k"));
assertEquals(2, r1.getInt("c1"));
assertEquals(3, r1.getInt("c2"));
assertEquals("a", r1.getString("v"));
UntypedResultSet.Row r2 = iter.next();
assertEquals(4, r2.getInt("k"));
assertEquals(5, r2.getInt("c1"));
assertEquals(6, r2.getInt("c2"));
assertEquals("b", r2.getString("v"));
assertFalse(iter.hasNext());
}
@Test
public void testNativeFunctions() throws Exception
{
final String KS = "cql_keyspace7";
final String TABLE = "table7";
final String schema = "CREATE TABLE " + KS + "." + TABLE + " ("
+ " k int,"
+ " c1 int,"
+ " c2 int,"
+ " v blob,"
+ " PRIMARY KEY (k, c1, c2)"
+ ")";
File tempdir = Files.createTempDir();
File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
assert dataDir.mkdirs();
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
.forTable(schema)
.using("INSERT INTO " + KS + "." + TABLE + " (k, c1, c2, v) VALUES (?, ?, ?, textAsBlob(?))")
.build();
writer.addRow(1, 2, 3, "abc");
writer.addRow(4, 5, 6, "efg");
writer.close();
loadSSTables(dataDir, KS);
UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE);
assertEquals(2, resultSet.size());
Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
UntypedResultSet.Row r1 = iter.next();
assertEquals(1, r1.getInt("k"));
assertEquals(2, r1.getInt("c1"));
assertEquals(3, r1.getInt("c2"));
assertEquals(ByteBufferUtil.bytes("abc"), r1.getBytes("v"));
UntypedResultSet.Row r2 = iter.next();
assertEquals(4, r2.getInt("k"));
assertEquals(5, r2.getInt("c1"));
assertEquals(6, r2.getInt("c2"));
assertEquals(ByteBufferUtil.bytes("efg"), r2.getBytes("v"));
assertFalse(iter.hasNext());
}
private static final int NUMBER_WRITES_IN_RUNNABLE = 10;
private class WriterThread extends Thread
{
private final File dataDir;
private final int id;
public volatile Exception exception;
public WriterThread(File dataDir, int id)
{
this.dataDir = dataDir;
this.id = id;
}
@Override
public void run()
{
String schema = "CREATE TABLE cql_keyspace2.table2 ("
+ " k int,"
+ " v int,"
+ " PRIMARY KEY (k, v)"
+ ")";
String insert = "INSERT INTO cql_keyspace2.table2 (k, v) VALUES (?, ?)";
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
.forTable(schema)
.using(insert).build();
try
{
for (int i = 0; i < NUMBER_WRITES_IN_RUNNABLE; i++)
{
writer.addRow(id, i);
}
writer.close();
}
catch (Exception e)
{
exception = e;
}
}
}
@Test
public void testConcurrentWriters() throws Exception
{
final String KS = "cql_keyspace2";
final String TABLE = "table2";
File tempdir = Files.createTempDir();
File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
assert dataDir.mkdirs();
WriterThread[] threads = new WriterThread[5];
for (int i = 0; i < threads.length; i++)
{
WriterThread thread = new WriterThread(dataDir, i);
threads[i] = thread;
thread.start();
}
for (WriterThread thread : threads)
{
thread.join();
assert !thread.isAlive() : "Thread should be dead by now";
if (thread.exception != null)
{
throw thread.exception;
}
}
loadSSTables(dataDir, KS);
UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace2.table2;");
assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size());
}
private static void loadSSTables(File dataDir, String ks) throws ExecutionException, InterruptedException
{
SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
{
private String keyspace;
public void init(String keyspace)
{
this.keyspace = keyspace;
for (Range<Token> range : StorageService.instance.getLocalRanges(ks))
addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
}
public CFMetaData getTableMetadata(String cfName)
{
return Schema.instance.getCFMetaData(keyspace, cfName);
}
}, new OutputHandler.SystemOutput(false, false));
loader.stream().get();
}
}