blob: 2542202302fbb045ee7a2efe54eb5d4acf9281e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.reads.range;
import java.util.ArrayList;
import java.util.List;
import com.google.common.collect.Iterators;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.ReplicaPlans;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.CloseableIterator;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.junit.Assert.assertEquals;
public class RangeCommandIteratorTest
{
private static final String KEYSPACE1 = "RangeCommandIteratorTest";
private static final String CF_STANDARD1 = "Standard1";
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE1,
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
}
@Test
public void testRangeCountWithRangeMerge()
{
List<Token> tokens = setTokens(100, 200, 300, 400);
int vnodeCount = 0;
Keyspace keyspace = Keyspace.open(KEYSPACE1);
List<ReplicaPlan.ForRangeRead> ranges = new ArrayList<>();
for (int i = 0; i + 1 < tokens.size(); i++)
{
Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
ranges.add(ReplicaPlans.forRangeRead(keyspace, ConsistencyLevel.ONE, range, 1));
vnodeCount++;
}
ReplicaPlanMerger merge = new ReplicaPlanMerger(ranges.iterator(), keyspace, ConsistencyLevel.ONE);
ReplicaPlan.ForRangeRead mergedRange = Iterators.getOnlyElement(merge);
// all ranges are merged as test has only one node.
assertEquals(vnodeCount, mergedRange.vnodeCount());
}
@Test
public void testRangeQueried()
{
List<Token> tokens = setTokens(100, 200, 300, 400);
int vnodeCount = tokens.size() + 1; // n tokens divide token ring into n+1 ranges
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
cfs.clearUnsafe();
int rows = 100;
for (int i = 0; i < rows; ++i)
{
RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 10, String.valueOf(i));
builder.clustering("c");
builder.add("val", String.valueOf(i));
builder.build().applyUnsafe();
}
Util.flush(cfs);
PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
AbstractBounds<PartitionPosition> keyRange = command.dataRange().keyRange();
// without range merger, there will be 2 batches requested: 1st batch with 1 range and 2nd batch with remaining ranges
CloseableIterator<ReplicaPlan.ForRangeRead> replicaPlans = replicaPlanIterator(keyRange, keyspace, false);
RangeCommandIterator data = new RangeCommandIterator(replicaPlans, command, 1, 1000, vnodeCount, nanoTime());
verifyRangeCommandIterator(data, rows, 2, vnodeCount);
// without range merger and initial cf=5, there will be 1 batches requested: 5 vnode ranges for 1st batch
replicaPlans = replicaPlanIterator(keyRange, keyspace, false);
data = new RangeCommandIterator(replicaPlans, command, vnodeCount, 1000, vnodeCount, nanoTime());
verifyRangeCommandIterator(data, rows, 1, vnodeCount);
// without range merger and max cf=1, there will be 5 batches requested: 1 vnode range per batch
replicaPlans = replicaPlanIterator(keyRange, keyspace, false);
data = new RangeCommandIterator(replicaPlans, command, 1, 1, vnodeCount, nanoTime());
verifyRangeCommandIterator(data, rows, vnodeCount, vnodeCount);
// with range merger, there will be only 1 batch requested, as all ranges share the same replica - localhost
replicaPlans = replicaPlanIterator(keyRange, keyspace, true);
data = new RangeCommandIterator(replicaPlans, command, 1, 1000, vnodeCount, nanoTime());
verifyRangeCommandIterator(data, rows, 1, vnodeCount);
// with range merger and max cf=1, there will be only 1 batch requested, as all ranges share the same replica - localhost
replicaPlans = replicaPlanIterator(keyRange, keyspace, true);
data = new RangeCommandIterator(replicaPlans, command, 1, 1, vnodeCount, nanoTime());
verifyRangeCommandIterator(data, rows, 1, vnodeCount);
}
@Test
public void testComputeConcurrencyFactor()
{
int maxConcurrentRangeRequest = 32;
// no live row returned, fetch all remaining ranges but hit the max instead
int cf = RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 500, 0);
assertEquals(maxConcurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
// no live row returned, fetch all remaining ranges
cf = RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConcurrentRangeRequest, 500, 0);
assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
// returned half rows, fetch rangesQueried again but hit the max instead
cf = RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConcurrentRangeRequest, 480, 240);
assertEquals(maxConcurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
// returned half rows, fetch rangesQueried again
cf = RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 480, 240);
assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
// returned most of rows, 1 more range to fetch
cf = RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConcurrentRangeRequest, 480, 479);
assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
}
private static List<Token> setTokens(int... values)
{
return new TokenUpdater().withKeys(values).update().getTokens();
}
private static CloseableIterator<ReplicaPlan.ForRangeRead> replicaPlanIterator(AbstractBounds<PartitionPosition> keyRange,
Keyspace keyspace,
boolean withRangeMerger)
{
CloseableIterator<ReplicaPlan.ForRangeRead> replicaPlans = new ReplicaPlanIterator(keyRange, keyspace, ConsistencyLevel.ONE);
if (withRangeMerger)
replicaPlans = new ReplicaPlanMerger(replicaPlans, keyspace, ConsistencyLevel.ONE);
return replicaPlans;
}
private static void verifyRangeCommandIterator(RangeCommandIterator data, int rows, int batches, int vnodeCount)
{
int num = Util.size(data);
assertEquals(rows, num);
assertEquals(batches, data.batchesRequested());
assertEquals(vnodeCount, data.rangesQueried());
}
}