blob: 8f3061a0427ae0f869f922af974bf5890409981d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.streaming;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import junit.framework.Assert;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.Refs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@RunWith(OrderedJUnit4ClassRunner.class)
public class StreamingTransferTest
{
private static final Logger logger = LoggerFactory.getLogger(StreamingTransferTest.class);
static
{
DatabaseDescriptor.daemonInitialization();
}
public static final InetAddress LOCAL = FBUtilities.getBroadcastAddress();
public static final String KEYSPACE1 = "StreamingTransferTest1";
public static final String CF_STANDARD = "Standard1";
public static final String CF_COUNTER = "Counter1";
public static final String CF_STANDARDINT = "StandardInteger1";
public static final String CF_INDEX = "Indexed1";
public static final String KEYSPACE_CACHEKEY = "KeyStreamingTransferTestSpace";
public static final String CF_STANDARD2 = "Standard2";
public static final String CF_STANDARD3 = "Standard3";
public static final String KEYSPACE2 = "StreamingTransferTest2";
@BeforeClass
public static void defineSchema() throws Exception
{
SchemaLoader.prepareServer();
StorageService.instance.initServer();
SchemaLoader.createKeyspace(KEYSPACE1,
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD),
CFMetaData.Builder.create(KEYSPACE1, CF_COUNTER, false, true, true)
.addPartitionKey("key", BytesType.instance)
.build(),
CFMetaData.Builder.create(KEYSPACE1, CF_STANDARDINT)
.addPartitionKey("key", AsciiType.instance)
.addClusteringColumn("cols", Int32Type.instance)
.addRegularColumn("val", BytesType.instance)
.build(),
SchemaLoader.compositeIndexCFMD(KEYSPACE1, CF_INDEX, true));
SchemaLoader.createKeyspace(KEYSPACE2,
KeyspaceParams.simple(1));
SchemaLoader.createKeyspace(KEYSPACE_CACHEKEY,
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(KEYSPACE_CACHEKEY, CF_STANDARD),
SchemaLoader.standardCFMD(KEYSPACE_CACHEKEY, CF_STANDARD2),
SchemaLoader.standardCFMD(KEYSPACE_CACHEKEY, CF_STANDARD3));
}
/**
* Test if empty {@link StreamPlan} returns success with empty result.
*/
@Test
public void testEmptyStreamPlan() throws Exception
{
StreamResultFuture futureResult = new StreamPlan("StreamingTransferTest").execute();
final UUID planId = futureResult.planId;
Futures.addCallback(futureResult, new FutureCallback<StreamState>()
{
public void onSuccess(StreamState result)
{
assert planId.equals(result.planId);
assert result.description.equals("StreamingTransferTest");
assert result.sessions.isEmpty();
}
public void onFailure(Throwable t)
{
fail();
}
});
// should be complete immediately
futureResult.get(100, TimeUnit.MILLISECONDS);
}
@Test
public void testRequestEmpty() throws Exception
{
// requesting empty data should succeed
IPartitioner p = Util.testPartitioner();
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
StreamResultFuture futureResult = new StreamPlan("StreamingTransferTest")
.requestRanges(LOCAL, LOCAL, KEYSPACE2, ranges)
.execute();
UUID planId = futureResult.planId;
StreamState result = futureResult.get();
assert planId.equals(result.planId);
assert result.description.equals("StreamingTransferTest");
// we should have completed session with empty transfer
assert result.sessions.size() == 1;
SessionInfo session = Iterables.get(result.sessions, 0);
assert session.peer.equals(LOCAL);
assert session.getTotalFilesReceived() == 0;
assert session.getTotalFilesSent() == 0;
assert session.getTotalSizeReceived() == 0;
assert session.getTotalSizeSent() == 0;
}
/**
* Create and transfer a single sstable, and return the keys that should have been transferred.
* The Mutator must create the given column, but it may also create any other columns it pleases.
*/
private List<String> createAndTransfer(ColumnFamilyStore cfs, Mutator mutator, boolean transferSSTables) throws Exception
{
// write a temporary SSTable, and unregister it
logger.debug("Mutating {}", cfs.name);
long timestamp = 1234;
for (int i = 1; i <= 3; i++)
mutator.mutate("key" + i, "col" + i, timestamp);
cfs.forceBlockingFlush();
Util.compactAll(cfs, Integer.MAX_VALUE).get();
assertEquals(1, cfs.getLiveSSTables().size());
// transfer the first and last key
logger.debug("Transferring {}", cfs.name);
int[] offs;
if (transferSSTables)
{
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
cfs.clearUnsafe();
transferSSTables(sstable);
offs = new int[]{1, 3};
}
else
{
long beforeStreaming = System.currentTimeMillis();
transferRanges(cfs);
cfs.discardSSTables(beforeStreaming);
offs = new int[]{2, 3};
}
// confirm that a single SSTable was transferred and registered
assertEquals(1, cfs.getLiveSSTables().size());
// and that the index and filter were properly recovered
List<ImmutableBTreePartition> partitions = Util.getAllUnfiltered(Util.cmd(cfs).build());
assertEquals(offs.length, partitions.size());
for (int i = 0; i < offs.length; i++)
{
String key = "key" + offs[i];
String col = "col" + offs[i];
assert !Util.getAll(Util.cmd(cfs, key).build()).isEmpty();
ImmutableBTreePartition partition = partitions.get(i);
assert ByteBufferUtil.compareUnsigned(partition.partitionKey().getKey(), ByteBufferUtil.bytes(key)) == 0;
assert ByteBufferUtil.compareUnsigned(partition.iterator().next().clustering().get(0), ByteBufferUtil.bytes(col)) == 0;
}
// and that the max timestamp for the file was rediscovered
assertEquals(timestamp, cfs.getLiveSSTables().iterator().next().getMaxTimestamp());
List<String> keys = new ArrayList<>();
for (int off : offs)
keys.add("key" + off);
logger.debug("... everything looks good for {}", cfs.name);
return keys;
}
private void transferSSTables(SSTableReader sstable) throws Exception
{
IPartitioner p = sstable.getPartitioner();
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
transfer(sstable, ranges);
}
private void transferRanges(ColumnFamilyStore cfs) throws Exception
{
IPartitioner p = cfs.getPartitioner();
List<Range<Token>> ranges = new ArrayList<>();
// wrapped range
ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0"))));
StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName());
streamPlan.execute().get();
verifyConnectionsAreClosed();
//cannot add ranges after stream session is finished
try
{
streamPlan.transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName());
fail("Should have thrown exception");
}
catch (RuntimeException e)
{
//do nothing
}
}
private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception
{
StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
streamPlan.execute().get();
verifyConnectionsAreClosed();
//cannot add files after stream session is finished
try
{
streamPlan.transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
fail("Should have thrown exception");
}
catch (RuntimeException e)
{
//do nothing
}
}
/**
* Test that finished incoming connections are removed from MessagingService (CASSANDRA-11854)
*/
private void verifyConnectionsAreClosed() throws InterruptedException
{
//after stream session is finished, message handlers may take several milliseconds to be closed
outer:
for (int i = 0; i <= 100; i++)
{
for (MessagingService.SocketThread socketThread : MessagingService.instance().getSocketThreads())
if (!socketThread.connections.isEmpty())
{
Thread.sleep(100);
continue outer;
}
return;
}
fail("Streaming connections remain registered in MessagingService");
}
private Collection<StreamSession.SSTableStreamingSections> makeStreamingDetails(List<Range<Token>> ranges, Refs<SSTableReader> sstables)
{
ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>();
for (SSTableReader sstable : sstables)
{
details.add(new StreamSession.SSTableStreamingSections(sstables.get(sstable),
sstable.getPositionsForRanges(ranges),
sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt));
}
return details;
}
private void doTransferTable(boolean transferSSTables) throws Exception
{
final Keyspace keyspace = Keyspace.open(KEYSPACE1);
final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_INDEX);
List<String> keys = createAndTransfer(cfs, new Mutator()
{
public void mutate(String key, String col, long timestamp) throws Exception
{
long val = key.hashCode();
RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata, timestamp, key);
builder.clustering(col).add("birthdate", ByteBufferUtil.bytes(val));
builder.build().applyUnsafe();
}
}, transferSSTables);
// confirm that the secondary index was recovered
for (String key : keys)
{
long val = key.hashCode();
// test we can search:
UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE birthdate = %d",
cfs.metadata.ksName, cfs.metadata.cfName, val));
assertEquals(1, result.size());
assert result.iterator().next().getBytes("key").equals(ByteBufferUtil.bytes(key));
}
}
/**
* Test to make sure RangeTombstones at column index boundary transferred correctly.
*/
@Test
public void testTransferRangeTombstones() throws Exception
{
String ks = KEYSPACE1;
String cfname = "StandardInteger1";
Keyspace keyspace = Keyspace.open(ks);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
ClusteringComparator comparator = cfs.getComparator();
String key = "key1";
RowUpdateBuilder updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key);
// add columns of size slightly less than column_index_size to force insert column index
updates.clustering(1)
.add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]))
.build()
.apply();
updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), key);
updates.clustering(6)
.add("val", ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize()]))
.build()
.apply();
// add RangeTombstones
//updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1 , key);
//updates.addRangeTombstone(Slice.make(comparator, comparator.make(2), comparator.make(4)))
// .build()
// .apply();
updates = new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros() + 1, key);
updates.addRangeTombstone(5, 7)
.build()
.apply();
cfs.forceBlockingFlush();
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
cfs.clearUnsafe();
transferSSTables(sstable);
// confirm that a single SSTable was transferred and registered
assertEquals(1, cfs.getLiveSSTables().size());
Row r = Util.getOnlyRow(Util.cmd(cfs).build());
Assert.assertFalse(r.isEmpty());
Assert.assertTrue(1 == Int32Type.instance.compose(r.clustering().get(0)));
}
@Test
public void testTransferTableViaRanges() throws Exception
{
doTransferTable(false);
}
@Test
public void testTransferTableViaSSTables() throws Exception
{
doTransferTable(true);
}
/*
@Test
public void testTransferTableCounter() throws Exception
{
final Keyspace keyspace = Keyspace.open(KEYSPACE1);
final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Counter1");
final CounterContext cc = new CounterContext();
final Map<String, ColumnFamily> cleanedEntries = new HashMap<>();
List<String> keys = createAndTransfer(cfs, new Mutator()
{
// Creates a new SSTable per key: all will be merged before streaming.
public void mutate(String key, String col, long timestamp) throws Exception
{
Map<String, ColumnFamily> entries = new HashMap<>();
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
ColumnFamily cfCleaned = ArrayBackedSortedColumns.factory.create(cfs.metadata);
CounterContext.ContextState state = CounterContext.ContextState.allocate(0, 1, 3);
state.writeLocal(CounterId.fromInt(2), 9L, 3L);
state.writeRemote(CounterId.fromInt(4), 4L, 2L);
state.writeRemote(CounterId.fromInt(6), 3L, 3L);
state.writeRemote(CounterId.fromInt(8), 2L, 4L);
cf.addColumn(new BufferCounterCell(cellname(col), state.context, timestamp));
cfCleaned.addColumn(new BufferCounterCell(cellname(col), cc.clearAllLocal(state.context), timestamp));
entries.put(key, cf);
cleanedEntries.put(key, cfCleaned);
cfs.addSSTable(SSTableUtils.prepare()
.ks(keyspace.getName())
.cf(cfs.name)
.generation(0)
.write(entries));
}
}, true);
// filter pre-cleaned entries locally, and ensure that the end result is equal
cleanedEntries.keySet().retainAll(keys);
SSTableReader cleaned = SSTableUtils.prepare()
.ks(keyspace.getName())
.cf(cfs.name)
.generation(0)
.write(cleanedEntries);
SSTableReader streamed = cfs.getLiveSSTables().iterator().next();
SSTableUtils.assertContentEquals(cleaned, streamed);
// Retransfer the file, making sure it is now idempotent (see CASSANDRA-3481)
cfs.clearUnsafe();
transferSSTables(streamed);
SSTableReader restreamed = cfs.getLiveSSTables().iterator().next();
SSTableUtils.assertContentEquals(streamed, restreamed);
}
@Test
public void testTransferTableMultiple() throws Exception
{
// write temporary SSTables, but don't register them
Set<String> content = new HashSet<>();
content.add("test");
content.add("test2");
content.add("test3");
SSTableReader sstable = new SSTableUtils(KEYSPACE1, CF_STANDARD).prepare().write(content);
String keyspaceName = sstable.getKeyspaceName();
String cfname = sstable.getColumnFamilyName();
content = new HashSet<>();
content.add("transfer1");
content.add("transfer2");
content.add("transfer3");
SSTableReader sstable2 = SSTableUtils.prepare().write(content);
// transfer the first and last key
IPartitioner p = Util.testPartitioner();
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("test"))));
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("transfer2")), p.getMinimumToken()));
// Acquiring references, transferSSTables needs it
Refs<SSTableReader> refs = Refs.tryRef(Arrays.asList(sstable, sstable2));
assert refs != null;
new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, refs)).execute().get();
// confirm that the sstables were transferred and registered and that 2 keys arrived
ColumnFamilyStore cfstore = Keyspace.open(keyspaceName).getColumnFamilyStore(cfname);
List<Row> rows = Util.getRangeSlice(cfstore);
assertEquals(2, rows.size());
assert rows.get(0).key.getKey().equals(ByteBufferUtil.bytes("test"));
assert rows.get(1).key.getKey().equals(ByteBufferUtil.bytes("transfer3"));
assert rows.get(0).cf.getColumnCount() == 1;
assert rows.get(1).cf.getColumnCount() == 1;
// these keys fall outside of the ranges and should not be transferred
assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer1"), "Standard1", System.currentTimeMillis())) == null;
assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer2"), "Standard1", System.currentTimeMillis())) == null;
assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test2"), "Standard1", System.currentTimeMillis())) == null;
assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test3"), "Standard1", System.currentTimeMillis())) == null;
}
@Test
public void testTransferOfMultipleColumnFamilies() throws Exception
{
String keyspace = KEYSPACE_CACHEKEY;
IPartitioner p = Util.testPartitioner();
String[] columnFamilies = new String[] { "Standard1", "Standard2", "Standard3" };
List<SSTableReader> ssTableReaders = new ArrayList<>();
NavigableMap<DecoratedKey,String> keys = new TreeMap<>();
for (String cf : columnFamilies)
{
Set<String> content = new HashSet<>();
content.add("data-" + cf + "-1");
content.add("data-" + cf + "-2");
content.add("data-" + cf + "-3");
SSTableUtils.Context context = SSTableUtils.prepare().ks(keyspace).cf(cf);
ssTableReaders.add(context.write(content));
// collect dks for each string key
for (String str : content)
keys.put(Util.dk(str), cf);
}
// transfer the first and last keys
Map.Entry<DecoratedKey,String> first = keys.firstEntry();
Map.Entry<DecoratedKey,String> last = keys.lastEntry();
Map.Entry<DecoratedKey,String> secondtolast = keys.lowerEntry(last.getKey());
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(p.getMinimumToken(), first.getKey().getToken()));
// the left hand side of the range is exclusive, so we transfer from the second-to-last token
ranges.add(new Range<>(secondtolast.getKey().getToken(), p.getMinimumToken()));
// Acquiring references, transferSSTables needs it
Refs<SSTableReader> refs = Refs.tryRef(ssTableReaders);
if (refs == null)
throw new AssertionError();
new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, refs)).execute().get();
// check that only two keys were transferred
for (Map.Entry<DecoratedKey,String> entry : Arrays.asList(first, last))
{
ColumnFamilyStore store = Keyspace.open(keyspace).getColumnFamilyStore(entry.getValue());
List<Row> rows = Util.getRangeSlice(store);
assertEquals(rows.toString(), 1, rows.size());
assertEquals(entry.getKey(), rows.get(0).key);
}
}
@Test
public void testRandomSSTableTransfer() throws Exception
{
final Keyspace keyspace = Keyspace.open(KEYSPACE1);
final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
Mutator mutator = new Mutator()
{
public void mutate(String key, String colName, long timestamp) throws Exception
{
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(keyspace.getName(), cfs.name);
cf.addColumn(column(colName, "value", timestamp));
cf.addColumn(new BufferCell(cellname("birthdate"), ByteBufferUtil.bytes(new Date(timestamp).toString()), timestamp));
Mutation rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes(key), cf);
logger.debug("Applying row to transfer {}", rm);
rm.applyUnsafe();
}
};
// write a lot more data so the data is spread in more than 1 chunk.
for (int i = 1; i <= 6000; i++)
mutator.mutate("key" + i, "col" + i, System.currentTimeMillis());
cfs.forceBlockingFlush();
Util.compactAll(cfs, Integer.MAX_VALUE).get();
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
cfs.clearUnsafe();
IPartitioner p = Util.testPartitioner();
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key1000"))));
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key5")), p.getToken(ByteBufferUtil.bytes("key500"))));
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key9")), p.getToken(ByteBufferUtil.bytes("key900"))));
transfer(sstable, ranges);
assertEquals(1, cfs.getLiveSSTables().size());
assertEquals(7, Util.getRangeSlice(cfs).size());
}
*/
public interface Mutator
{
public void mutate(String key, String col, long timestamp) throws Exception;
}
}