blob: b823f23dd9d5a782d6235ee3c618ab04e18d35c7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.net.UnknownHostException;
import java.util.Random;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageFlag;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.ParamType;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.net.Verb.*;
import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ReadCommandVerbHandlerTest
{
private final static Random random = new Random();
private static ReadCommandVerbHandler handler;
private static TableMetadata metadata;
private static TableMetadata metadata_with_transient;
private static DecoratedKey KEY;
private static final String TEST_NAME = "read_command_vh_test_";
private static final String KEYSPACE = TEST_NAME + "cql_keyspace_replicated";
private static final String KEYSPACE_WITH_TRANSIENT = TEST_NAME + "ks_with_transient";
private static final String TABLE = "table1";
@BeforeClass
public static void init() throws Throwable
{
SchemaLoader.loadSchema();
SchemaLoader.schemaDefinition(TEST_NAME);
metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
metadata_with_transient = Schema.instance.getTableMetadata(KEYSPACE_WITH_TRANSIENT, TABLE);
KEY = key(metadata, 1);
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
tmd.updateNormalToken(KEY.getToken(), InetAddressAndPort.getByName("127.0.0.2"));
tmd.updateNormalToken(key(metadata, 2).getToken(), InetAddressAndPort.getByName("127.0.0.3"));
tmd.updateNormalToken(key(metadata, 3).getToken(), FBUtilities.getBroadcastAddressAndPort());
}
@Before
public void setup()
{
MessagingService.instance().inboundSink.clear();
MessagingService.instance().outboundSink.clear();
MessagingService.instance().outboundSink.add((message, to) -> false);
MessagingService.instance().inboundSink.add((message) -> false);
handler = new ReadCommandVerbHandler();
}
@Test
public void setRepairedDataTrackingFlagIfHeaderPresent()
{
TrackingSinglePartitionReadCommand command = new TrackingSinglePartitionReadCommand(metadata);
assertFalse(command.isTrackingRepairedData());
handler.doVerb(Message.builder(READ_REQ, (ReadCommand) command)
.from(peer())
.withFlag(MessageFlag.TRACK_REPAIRED_DATA)
.withId(messageId())
.build());
assertTrue(command.isTrackingRepairedData());
}
@Test
public void dontSetRepairedDataTrackingFlagUnlessHeaderPresent()
{
TrackingSinglePartitionReadCommand command = new TrackingSinglePartitionReadCommand(metadata);
assertFalse(command.isTrackingRepairedData());
handler.doVerb(Message.builder(READ_REQ, (ReadCommand) command)
.from(peer())
.withId(messageId())
.withParam(ParamType.TRACE_SESSION, nextTimeUUID())
.build());
assertFalse(command.isTrackingRepairedData());
}
@Test
public void dontSetRepairedDataTrackingFlagIfHeadersEmpty()
{
TrackingSinglePartitionReadCommand command = new TrackingSinglePartitionReadCommand(metadata);
assertFalse(command.isTrackingRepairedData());
handler.doVerb(Message.builder(READ_REQ, (ReadCommand) command)
.withId(messageId())
.from(peer())
.build());
assertFalse(command.isTrackingRepairedData());
}
@Test (expected = InvalidRequestException.class)
public void rejectsRequestWithNonMatchingTransientness()
{
ReadCommand command = new TrackingSinglePartitionReadCommand(metadata_with_transient);
handler.doVerb(Message.builder(READ_REQ, command)
.from(peer())
.withId(messageId())
.build());
}
private static int messageId()
{
return random.nextInt();
}
private static InetAddressAndPort peer()
{
try
{
return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, 9});
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
}
private static class TrackingSinglePartitionReadCommand extends SinglePartitionReadCommand
{
private boolean trackingRepairedData = false;
TrackingSinglePartitionReadCommand(TableMetadata metadata)
{
super(false,
0,
false,
metadata,
FBUtilities.nowInSeconds(),
ColumnFilter.all(metadata),
RowFilter.NONE,
DataLimits.NONE,
KEY,
new ClusteringIndexSliceFilter(Slices.ALL, false),
null,
false);
}
@Override
public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
{
trackingRepairedData = executionController.isTrackingRepairedStatus();
return super.executeLocally(executionController);
}
public boolean isTrackingRepairedData()
{
return trackingRepairedData;
}
}
private static DecoratedKey key(TableMetadata metadata, int key)
{
return metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key));
}
}