blob: 5d20367663e643bf1d1e1fd34c1354996bf60881 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable;
import java.nio.ByteBuffer;
import org.apache.cassandra.io.util.File;
import org.junit.Test;
import org.apache.cassandra.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.utils.TimeUUID;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
public class SSTableWriterTest extends SSTableWriterTestBase
{
@Test
public void testAbortTxnWithOpenEarlyShouldRemoveSSTable()
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
truncate(cfs);
File dir = cfs.getDirectories().getDirectoryForNewSSTables();
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
try (SSTableWriter writer = getWriter(cfs, dir, txn))
{
for (int i = 0; i < 10000; i++)
{
UpdateBuilder builder = UpdateBuilder.create(cfs.metadata(), random(i, 10)).withTimestamp(1);
for (int j = 0; j < 100; j++)
builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
writer.append(builder.build().unfilteredIterator());
}
SSTableReader s = writer.setMaxDataAge(1000).openEarly();
assert s != null;
assertFileCounts(dir.tryListNames());
for (int i = 10000; i < 20000; i++)
{
UpdateBuilder builder = UpdateBuilder.create(cfs.metadata(), random(i, 10)).withTimestamp(1);
for (int j = 0; j < 100; j++)
builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
writer.append(builder.build().unfilteredIterator());
}
SSTableReader s2 = writer.setMaxDataAge(1000).openEarly();
assertTrue(s.last.compareTo(s2.last) < 0);
assertFileCounts(dir.tryListNames());
s.selfRef().release();
s2.selfRef().release();
int datafiles = assertFileCounts(dir.tryListNames());
assertEquals(datafiles, 1);
LifecycleTransaction.waitForDeletions();
assertFileCounts(dir.tryListNames());
writer.abort();
txn.abort();
LifecycleTransaction.waitForDeletions();
datafiles = assertFileCounts(dir.tryListNames());
assertEquals(datafiles, 0);
validateCFS(cfs);
}
}
@Test
public void testAbortTxnWithClosedWriterShouldRemoveSSTable()
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
truncate(cfs);
File dir = cfs.getDirectories().getDirectoryForNewSSTables();
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
try (SSTableWriter writer = getWriter(cfs, dir, txn))
{
for (int i = 0; i < 10000; i++)
{
UpdateBuilder builder = UpdateBuilder.create(cfs.metadata(), random(i, 10)).withTimestamp(1);
for (int j = 0; j < 100; j++)
builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
writer.append(builder.build().unfilteredIterator());
}
assertFileCounts(dir.tryListNames());
for (int i = 10000; i < 20000; i++)
{
UpdateBuilder builder = UpdateBuilder.create(cfs.metadata(), random(i, 10)).withTimestamp(1);
for (int j = 0; j < 100; j++)
builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
writer.append(builder.build().unfilteredIterator());
}
SSTableReader sstable = writer.finish(true);
int datafiles = assertFileCounts(dir.tryListNames());
assertEquals(datafiles, 1);
sstable.selfRef().release();
LifecycleTransaction.waitForDeletions();
assertFileCounts(dir.tryListNames());
txn.abort();
LifecycleTransaction.waitForDeletions();
datafiles = assertFileCounts(dir.tryListNames());
assertEquals(datafiles, 0);
validateCFS(cfs);
}
}
@Test
public void testAbortTxnWithClosedAndOpenWriterShouldRemoveAllSSTables()
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
truncate(cfs);
File dir = cfs.getDirectories().getDirectoryForNewSSTables();
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
SSTableWriter writer1 = getWriter(cfs, dir, txn);
SSTableWriter writer2 = getWriter(cfs, dir, txn);
try
{
for (int i = 0; i < 10000; i++)
{
UpdateBuilder builder = UpdateBuilder.create(cfs.metadata(), random(i, 10)).withTimestamp(1);
for (int j = 0; j < 100; j++)
builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
writer1.append(builder.build().unfilteredIterator());
}
assertFileCounts(dir.tryListNames());
for (int i = 10000; i < 20000; i++)
{
UpdateBuilder builder = UpdateBuilder.create(cfs.metadata(), random(i, 10)).withTimestamp(1);
for (int j = 0; j < 100; j++)
builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
writer2.append(builder.build().unfilteredIterator());
}
SSTableReader sstable = writer1.finish(true);
txn.update(sstable, false);
assertFileCounts(dir.tryListNames());
int datafiles = assertFileCounts(dir.tryListNames());
assertEquals(datafiles, 2);
LifecycleTransaction.waitForDeletions();
assertFileCounts(dir.tryListNames());
txn.abort();
LifecycleTransaction.waitForDeletions();
datafiles = assertFileCounts(dir.tryListNames());
assertEquals(datafiles, 0);
validateCFS(cfs);
}
finally
{
writer1.close();
writer2.close();
}
}
@Test
public void testValueTooBigCorruption() throws InterruptedException
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_SMALL_MAX_VALUE);
truncate(cfs);
File dir = cfs.getDirectories().getDirectoryForNewSSTables();
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
try (SSTableWriter writer1 = getWriter(cfs, dir, txn))
{
UpdateBuilder largeValue = UpdateBuilder.create(cfs.metadata(), "large_value").withTimestamp(1);
largeValue.newRow("clustering").add("val", ByteBuffer.allocate(2 * 1024 * 1024));
writer1.append(largeValue.build().unfilteredIterator());
SSTableReader sstable = writer1.finish(true);
txn.update(sstable, false);
try
{
DecoratedKey dk = Util.dk("large_value");
UnfilteredRowIterator rowIter = sstable.rowIterator(dk,
Slices.ALL,
ColumnFilter.all(cfs.metadata()),
false,
SSTableReadsListener.NOOP_LISTENER);
while (rowIter.hasNext())
{
rowIter.next();
// no-op read, as values may not appear expected
}
fail("Expected a CorruptSSTableException to be thrown");
}
catch (CorruptSSTableException e)
{
}
txn.abort();
LifecycleTransaction.waitForDeletions();
}
}
private static void assertValidRepairMetadata(long repairedAt, TimeUUID pendingRepair, boolean isTransient)
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_SMALL_MAX_VALUE);
File dir = cfs.getDirectories().getDirectoryForNewSSTables();
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
try (SSTableWriter writer = getWriter(cfs, dir, txn, repairedAt, pendingRepair, isTransient))
{
// expected
}
catch (IllegalArgumentException e)
{
throw new AssertionError("Unexpected IllegalArgumentException", e);
}
txn.abort();
LifecycleTransaction.waitForDeletions();
}
private static void assertInvalidRepairMetadata(long repairedAt, TimeUUID pendingRepair, boolean isTransient)
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_SMALL_MAX_VALUE);
File dir = cfs.getDirectories().getDirectoryForNewSSTables();
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
try (SSTableWriter writer = getWriter(cfs, dir, txn, repairedAt, pendingRepair, isTransient))
{
fail("Expected IllegalArgumentException");
}
catch (IllegalArgumentException e)
{
// expected
}
txn.abort();
LifecycleTransaction.waitForDeletions();
}
/**
* It should only be possible to create sstables marked transient that also have a pending repair
*/
@Test
public void testRepairMetadataValidation()
{
assertValidRepairMetadata(UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, false);
assertValidRepairMetadata(1, NO_PENDING_REPAIR, false);
assertValidRepairMetadata(UNREPAIRED_SSTABLE, nextTimeUUID(), false);
assertValidRepairMetadata(UNREPAIRED_SSTABLE, nextTimeUUID(), true);
assertInvalidRepairMetadata(UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, true);
assertInvalidRepairMetadata(1, nextTimeUUID(), false);
assertInvalidRepairMetadata(1, NO_PENDING_REPAIR, true);
}
}