blob: fb1de7c14cb3d015f6db04ef39ef21cb88b62dee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable;
import java.io.File;
import java.io.FilenameFilter;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ExecutionException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.functions.UDHelper;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.*;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.UDTValue;
import com.datastax.driver.core.UserType;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class CQLSSTableWriterTest
{
@BeforeClass
public static void setup() throws Exception
{
SchemaLoader.cleanupAndLeaveDirs();
Keyspace.setInitialized();
StorageService.instance.initServer();
}
@AfterClass
public static void tearDown() throws Exception
{
Config.setClientMode(false);
}
@Test
public void testUnsortedWriter() throws Exception
{
try (AutoCloseable switcher = Util.switchPartitioner(ByteOrderedPartitioner.instance))
{
String KS = "cql_keyspace";
String TABLE = "table1";
File tempdir = Files.createTempDir();
File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
assert dataDir.mkdirs();
String schema = "CREATE TABLE cql_keyspace.table1 ("
+ " k int PRIMARY KEY,"
+ " v1 text,"
+ " v2 int"
+ ")";
String insert = "INSERT INTO cql_keyspace.table1 (k, v1, v2) VALUES (?, ?, ?)";
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
.forTable(schema)
.using(insert).build();
writer.addRow(0, "test1", 24);
writer.addRow(1, "test2", 44);
writer.addRow(2, "test3", 42);
writer.addRow(ImmutableMap.<String, Object>of("k", 3, "v2", 12));
writer.close();
loadSSTables(writer.getInnermostDirectory(), KS);
UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table1;");
assertEquals(4, rs.size());
Iterator<UntypedResultSet.Row> iter = rs.iterator();
UntypedResultSet.Row row;
row = iter.next();
assertEquals(0, row.getInt("k"));
assertEquals("test1", row.getString("v1"));
assertEquals(24, row.getInt("v2"));
row = iter.next();
assertEquals(1, row.getInt("k"));
assertEquals("test2", row.getString("v1"));
//assertFalse(row.has("v2"));
assertEquals(44, row.getInt("v2"));
row = iter.next();
assertEquals(2, row.getInt("k"));
assertEquals("test3", row.getString("v1"));
assertEquals(42, row.getInt("v2"));
row = iter.next();
assertEquals(3, row.getInt("k"));
assertEquals(null, row.getBytes("v1")); // Using getBytes because we know it won't NPE
assertEquals(12, row.getInt("v2"));
}
}
@Test(expected = IllegalArgumentException.class)
public void testForbidCounterUpdates() throws Exception
{
String KS = "cql_keyspace";
String TABLE = "counter1";
File tempdir = Files.createTempDir();
File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
assert dataDir.mkdirs();
String schema = "CREATE TABLE cql_keyspace.counter1 (" +
" my_id int, " +
" my_counter counter, " +
" PRIMARY KEY (my_id)" +
")";
String insert = String.format("UPDATE cql_keyspace.counter1 SET my_counter = my_counter - ? WHERE my_id = ?");
CQLSSTableWriter.builder().inDirectory(dataDir)
.forTable(schema)
.withPartitioner(Murmur3Partitioner.instance)
.using(insert).build();
}
@Test
public void testSyncWithinPartition() throws Exception
{
// Check that the write respect the buffer size even if we only insert rows withing the same partition (#7360)
// To do that simply, we use a writer with a buffer of 1MB, and write 2 rows in the same partition with a value
// > 1MB and validate that this created more than 1 sstable.
String KS = "ks";
String TABLE = "test";
File tempdir = Files.createTempDir();
File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
assert dataDir.mkdirs();
String schema = "CREATE TABLE ks.test ("
+ " k int PRIMARY KEY,"
+ " v blob"
+ ")";
String insert = "INSERT INTO ks.test (k, v) VALUES (?, ?)";
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
.forTable(schema)
.using(insert)
.withBufferSizeInMB(1)
.build();
ByteBuffer val = ByteBuffer.allocate(1024 * 1050);
writer.addRow(0, val);
writer.addRow(1, val);
writer.close();
FilenameFilter filterDataFiles = new FilenameFilter()
{
public boolean accept(File dir, String name)
{
return name.endsWith("-Data.db");
}
};
assert writer.getInnermostDirectory().list(filterDataFiles).length > 1 : Arrays.toString(writer.getInnermostDirectory().list(filterDataFiles));
}
@Test
public void testSyncNoEmptyRows() throws Exception
{
// Check that the write does not throw an empty partition error (#9071)
File tempdir = Files.createTempDir();
String schema = "CREATE TABLE ks.test2 ("
+ " k UUID,"
+ " c int,"
+ " PRIMARY KEY (k)"
+ ")";
String insert = "INSERT INTO ks.test2 (k, c) VALUES (?, ?)";
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(tempdir)
.forTable(schema)
.using(insert)
.withBufferSizeInMB(1)
.build();
for (int i = 0 ; i < 50000 ; i++) {
writer.addRow(UUID.randomUUID(), 0);
}
writer.close();
}
private static final int NUMBER_WRITES_IN_RUNNABLE = 10;
private class WriterThread extends Thread
{
private final int id;
private final ColumnFamilyStore cfs;
public volatile Exception exception;
public WriterThread(ColumnFamilyStore cfs, int id)
{
this.cfs = cfs;
this.id = id;
}
@Override
public void run()
{
String insert = "INSERT INTO cql_keyspace2.table2 (k, v) VALUES (?, ?)";
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.withCfs(cfs)
.using(insert).build();
try
{
for (int i = 0; i < NUMBER_WRITES_IN_RUNNABLE; i++)
{
writer.addRow(id, i);
}
writer.close();
}
catch (Exception e)
{
exception = e;
}
}
}
@Test
public void testConcurrentWriters() throws Exception
{
final String KS = "cql_keyspace2";
final String TABLE = "table2";
File tempdir = Files.createTempDir();
File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
assert dataDir.mkdirs();
String schema = "CREATE TABLE cql_keyspace2.table2 ("
+ " k int,"
+ " v int,"
+ " PRIMARY KEY (k, v)"
+ ")";
ColumnFamilyStore cfs = CQLSSTableWriter.Builder.createOfflineTable(schema, Collections.singletonList(dataDir));
WriterThread[] threads = new WriterThread[5];
for (int i = 0; i < threads.length; i++)
{
WriterThread thread = new WriterThread(cfs, i);
threads[i] = thread;
thread.start();
}
for (WriterThread thread : threads)
{
thread.join();
assert !thread.isAlive() : "Thread should be dead by now";
if (thread.exception != null)
{
throw thread.exception;
}
}
loadSSTables(cfs.getDirectories().getDirectoryForNewSSTables(), KS);
UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace2.table2;");
assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size());
}
@Test
@SuppressWarnings("unchecked")
public void testWritesWithUdts() throws Exception
{
final String KS = "cql_keyspace3";
final String TABLE = "table3";
final String schema = "CREATE TABLE " + KS + "." + TABLE + " ("
+ " k int,"
+ " v1 list<frozen<tuple2>>,"
+ " v2 frozen<tuple3>,"
+ " PRIMARY KEY (k)"
+ ")";
File tempdir = Files.createTempDir();
File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
assert dataDir.mkdirs();
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
.withType("CREATE TYPE " + KS + ".tuple2 (a int, b int)")
.withType("CREATE TYPE " + KS + ".tuple3 (a int, b int, c int)")
.forTable(schema)
.using("INSERT INTO " + KS + "." + TABLE + " (k, v1, v2) " +
"VALUES (?, ?, ?)").build();
UserType tuple2Type = writer.getUDType("tuple2");
UserType tuple3Type = writer.getUDType("tuple3");
for (int i = 0; i < 100; i++)
{
writer.addRow(i,
ImmutableList.builder()
.add(tuple2Type.newValue()
.setInt("a", i * 10)
.setInt("b", i * 20))
.add(tuple2Type.newValue()
.setInt("a", i * 30)
.setInt("b", i * 40))
.build(),
tuple3Type.newValue()
.setInt("a", i * 100)
.setInt("b", i * 200)
.setInt("c", i * 300));
}
writer.close();
loadSSTables(writer.getInnermostDirectory(), KS);
UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE);
TypeCodec collectionCodec = UDHelper.codecFor(DataType.CollectionType.frozenList(tuple2Type));
TypeCodec tuple3Codec = UDHelper.codecFor(tuple3Type);
assertEquals(resultSet.size(), 100);
int cnt = 0;
for (UntypedResultSet.Row row: resultSet) {
assertEquals(cnt,
row.getInt("k"));
List<UDTValue> values = (List<UDTValue>) collectionCodec.deserialize(row.getBytes("v1"),
ProtocolVersion.NEWEST_SUPPORTED);
assertEquals(values.get(0).getInt("a"), cnt * 10);
assertEquals(values.get(0).getInt("b"), cnt * 20);
assertEquals(values.get(1).getInt("a"), cnt * 30);
assertEquals(values.get(1).getInt("b"), cnt * 40);
UDTValue v2 = (UDTValue) tuple3Codec.deserialize(row.getBytes("v2"), ProtocolVersion.NEWEST_SUPPORTED);
assertEquals(v2.getInt("a"), cnt * 100);
assertEquals(v2.getInt("b"), cnt * 200);
assertEquals(v2.getInt("c"), cnt * 300);
cnt++;
}
}
@Test
@SuppressWarnings("unchecked")
public void testWritesWithDependentUdts() throws Exception
{
final String KS = "cql_keyspace4";
final String TABLE = "table4";
final String schema = "CREATE TABLE " + KS + "." + TABLE + " ("
+ " k int,"
+ " v1 frozen<nested_tuple>,"
+ " PRIMARY KEY (k)"
+ ")";
File tempdir = Files.createTempDir();
File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
assert dataDir.mkdirs();
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
.withType("CREATE TYPE " + KS + ".nested_tuple (c int, tpl frozen<tuple2>)")
.withType("CREATE TYPE " + KS + ".tuple2 (a int, b int)")
.forTable(schema)
.using("INSERT INTO " + KS + "." + TABLE + " (k, v1) " +
"VALUES (?, ?)")
.build();
UserType tuple2Type = writer.getUDType("tuple2");
UserType nestedTuple = writer.getUDType("nested_tuple");
TypeCodec tuple2Codec = UDHelper.codecFor(tuple2Type);
TypeCodec nestedTupleCodec = UDHelper.codecFor(nestedTuple);
for (int i = 0; i < 100; i++)
{
writer.addRow(i,
nestedTuple.newValue()
.setInt("c", i * 100)
.set("tpl",
tuple2Type.newValue()
.setInt("a", i * 200)
.setInt("b", i * 300),
tuple2Codec));
}
writer.close();
loadSSTables(writer.getInnermostDirectory(), KS);
UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE);
assertEquals(resultSet.size(), 100);
int cnt = 0;
for (UntypedResultSet.Row row: resultSet) {
assertEquals(cnt,
row.getInt("k"));
UDTValue nestedTpl = (UDTValue) nestedTupleCodec.deserialize(row.getBytes("v1"),
ProtocolVersion.NEWEST_SUPPORTED);
assertEquals(nestedTpl.getInt("c"), cnt * 100);
UDTValue tpl = nestedTpl.getUDTValue("tpl");
assertEquals(tpl.getInt("a"), cnt * 200);
assertEquals(tpl.getInt("b"), cnt * 300);
cnt++;
}
}
@Test
public void testUnsetValues() throws Exception
{
final String KS = "cql_keyspace5";
final String TABLE = "table5";
final String schema = "CREATE TABLE " + KS + "." + TABLE + " ("
+ " k int,"
+ " c1 int,"
+ " c2 int,"
+ " v text,"
+ " PRIMARY KEY (k, c1, c2)"
+ ")";
File tempdir = Files.createTempDir();
File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
assert dataDir.mkdirs();
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
.forTable(schema)
.using("INSERT INTO " + KS + "." + TABLE + " (k, c1, c2, v) " +
"VALUES (?, ?, ?, ?)")
.build();
try
{
writer.addRow(1, 1, 1);
fail("Passing less arguments then expected in prepared statement should not work.");
}
catch (InvalidRequestException e)
{
assertEquals("Invalid number of arguments, expecting 4 values but got 3",
e.getMessage());
}
try
{
writer.addRow(1, 1, CQLSSTableWriter.UNSET_VALUE, "1");
fail("Unset values should not work with clustering columns.");
}
catch (InvalidRequestException e)
{
assertEquals("Invalid unset value for column c2",
e.getMessage());
}
try
{
writer.addRow(ImmutableMap.<String, Object>builder().put("k", 1).put("c1", 1).put("v", CQLSSTableWriter.UNSET_VALUE).build());
fail("Unset or null clustering columns should not be allowed.");
}
catch (InvalidRequestException e)
{
assertEquals("Invalid null value in condition for column c2",
e.getMessage());
}
writer.addRow(1, 1, 1, CQLSSTableWriter.UNSET_VALUE);
writer.addRow(2, 2, 2, null);
writer.addRow(Arrays.asList(3, 3, 3, CQLSSTableWriter.UNSET_VALUE));
writer.addRow(ImmutableMap.<String, Object>builder()
.put("k", 4)
.put("c1", 4)
.put("c2", 4)
.put("v", CQLSSTableWriter.UNSET_VALUE)
.build());
writer.addRow(Arrays.asList(3, 3, 3, CQLSSTableWriter.UNSET_VALUE));
writer.addRow(5, 5, 5, "5");
writer.close();
loadSSTables(writer.getInnermostDirectory(), KS);
UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE);
Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
UntypedResultSet.Row r1 = iter.next();
assertEquals(1, r1.getInt("k"));
assertEquals(1, r1.getInt("c1"));
assertEquals(1, r1.getInt("c2"));
assertEquals(false, r1.has("v"));
UntypedResultSet.Row r2 = iter.next();
assertEquals(2, r2.getInt("k"));
assertEquals(2, r2.getInt("c1"));
assertEquals(2, r2.getInt("c2"));
assertEquals(false, r2.has("v"));
UntypedResultSet.Row r3 = iter.next();
assertEquals(3, r3.getInt("k"));
assertEquals(3, r3.getInt("c1"));
assertEquals(3, r3.getInt("c2"));
assertEquals(false, r3.has("v"));
UntypedResultSet.Row r4 = iter.next();
assertEquals(4, r4.getInt("k"));
assertEquals(4, r4.getInt("c1"));
assertEquals(4, r4.getInt("c2"));
assertEquals(false, r3.has("v"));
UntypedResultSet.Row r5 = iter.next();
assertEquals(5, r5.getInt("k"));
assertEquals(5, r5.getInt("c1"));
assertEquals(5, r5.getInt("c2"));
assertEquals(true, r5.has("v"));
assertEquals("5", r5.getString("v"));
}
private static void loadSSTables(File dataDir, String ks) throws ExecutionException, InterruptedException
{
SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
{
private String keyspace;
public void init(String keyspace)
{
this.keyspace = keyspace;
for (Range<Token> range : StorageService.instance.getLocalRanges(ks))
addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
}
public CFMetaData getTableMetadata(String cfName)
{
return Schema.instance.getCFMetaData(keyspace, cfName);
}
}, new OutputHandler.SystemOutput(false, false));
loader.stream().get();
}
}