blob: 34f25a175f623fa8d0171f606ed26c113c704c46 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.db;
import static org.junit.Assert.*;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.SortedSet;
import java.util.TreeSet;
import com.google.common.base.Predicate;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.commitlog.CommitLogTestReplayer;
import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
public class ReadMessageTest
{
private static final String KEYSPACE1 = "ReadMessageTest1";
private static final String KEYSPACENOCOMMIT = "ReadMessageTest_NoCommit";
private static final String CF = "Standard1";
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE1,
SimpleStrategy.class,
KSMetaData.optsWithRF(1),
SchemaLoader.standardCFMD(KEYSPACE1, CF));
SchemaLoader.createKeyspace(KEYSPACENOCOMMIT,
false,
true,
SimpleStrategy.class,
KSMetaData.optsWithRF(1),
SchemaLoader.standardCFMD(KEYSPACENOCOMMIT, CF));
}
@Test
public void testMakeReadMessage() throws IOException
{
CellNameType type = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1").getComparator();
SortedSet<CellName> colList = new TreeSet<CellName>(type);
colList.add(Util.cellname("col1"));
colList.add(Util.cellname("col2"));
ReadCommand rm, rm2;
DecoratedKey dk = Util.dk("row1");
long ts = System.currentTimeMillis();
rm = new SliceByNamesReadCommand(KEYSPACE1, dk.getKey(), "Standard1", ts, new NamesQueryFilter(colList));
rm2 = serializeAndDeserializeReadMessage(rm);
assert rm2.toString().equals(rm.toString());
rm = new SliceFromReadCommand(KEYSPACE1, dk.getKey(), "Standard1", ts, new SliceQueryFilter(Composites.EMPTY, Composites.EMPTY, true, 2));
rm2 = serializeAndDeserializeReadMessage(rm);
assert rm2.toString().equals(rm.toString());
rm = new SliceFromReadCommand(KEYSPACE1, dk.getKey(), "Standard1", ts, new SliceQueryFilter(Util.cellname("a"), Util.cellname("z"), true, 5));
rm2 = serializeAndDeserializeReadMessage(rm);
assert rm2.toString().equals(rm.toString());
}
private ReadCommand serializeAndDeserializeReadMessage(ReadCommand rm) throws IOException
{
ReadCommandSerializer rms = ReadCommand.serializer;
DataOutputBuffer out = new DataOutputBuffer();
ByteArrayInputStream bis;
rms.serialize(rm, out, MessagingService.current_version);
bis = new ByteArrayInputStream(out.getData(), 0, out.getLength());
return rms.deserialize(new DataInputStream(bis), MessagingService.current_version);
}
@Test
public void testGetColumn()
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
CellNameType type = keyspace.getColumnFamilyStore("Standard1").getComparator();
Mutation rm;
DecoratedKey dk = Util.dk("key1");
// add data
rm = new Mutation(KEYSPACE1, dk.getKey());
rm.add("Standard1", Util.cellname("Column1"), ByteBufferUtil.bytes("abcd"), 0);
rm.apply();
ReadCommand command = new SliceByNamesReadCommand(KEYSPACE1, dk.getKey(), "Standard1", System.currentTimeMillis(), new NamesQueryFilter(FBUtilities.singleton(Util.cellname("Column1"), type)));
Row row = command.getRow(keyspace);
Cell col = row.cf.getColumn(Util.cellname("Column1"));
assertEquals(col.value(), ByteBuffer.wrap("abcd".getBytes()));
}
@Test
public void testNoCommitLog() throws Exception
{
Mutation rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("row"));
rm.add("Standard1", Util.cellname("commit1"), ByteBufferUtil.bytes("abcd"), 0);
rm.apply();
rm = new Mutation(KEYSPACENOCOMMIT, ByteBufferUtil.bytes("row"));
rm.add("Standard1", Util.cellname("commit2"), ByteBufferUtil.bytes("abcd"), 0);
rm.apply();
Checker checker = new Checker();
CommitLogTestReplayer.examineCommitLog(checker);
assertTrue(checker.commitLogMessageFound);
assertFalse(checker.noCommitLogMessageFound);
}
static class Checker implements Predicate<Mutation>
{
boolean commitLogMessageFound = false;
boolean noCommitLogMessageFound = false;
public boolean apply(Mutation mutation)
{
for (ColumnFamily cf : mutation.getColumnFamilies())
{
if (cf.getColumn(Util.cellname("commit1")) != null)
commitLogMessageFound = true;
if (cf.getColumn(Util.cellname("commit2")) != null)
noCommitLogMessageFound = true;
}
return true;
}
}
}